Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ object NestedColumnAliasing {
case _ => false
}

// Note that when we group by extractors with their references, we should remove
// cosmetic variations.
val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences)
val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
.filter(!_.references.subsetOf(exclusiveAttrSet))
.groupBy(_.references.head)
.groupBy(_.references.head.canonicalized.asInstanceOf[Attribute])
.flatMap { case (attr, nestedFields: Seq[ExtractValue]) =>
// Each expression can contain multiple nested fields.
// Note that we keep the original names to deliver to parquet in a case-sensitive way.
Expand All @@ -132,9 +134,12 @@ object NestedColumnAliasing {

// If all nested fields of `attr` are used, we don't need to introduce new aliases.
// By default, ColumnPruning rule uses `attr` already.
// Note that we need to remove cosmetic variations first, so we only count a
// nested field once.
if (nestedFieldToAlias.nonEmpty &&
nestedFieldToAlias
.map { case (nestedField, _) => totalFieldNum(nestedField.dataType) }
nestedFields.map(_.canonicalized)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the main difference from 3.0, dedupNestedFields -> nestedFields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for the change in NestedColumnAliasing. Another difference is test. One test in master branch cannot pass in branch-3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Test part looked correct because it's a subset. For this part, it looks a little different and needs more validation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for check. Yes, there is a bit difference between master and branch-3.0 here. So no dedupNestedFields in branch-3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed this part. I think the added test still fails if we don't have this change. Is this correct?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It does. The new test case still validate this patch in terms of that part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this test fails in current branch-3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

.distinct
.map { nestedField => totalFieldNum(nestedField.dataType) }
.sum < totalFieldNum(attr.dataType)) {
Some(attr.exprId -> nestedFieldToAlias)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ abstract class SchemaPruningSuite
checkAnswer(query, Row("Y.", 1) :: Row("X.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
}

testSchemaPruning("SPARK-32163: nested pruning should work even with cosmetic variations") {
withTempView("contact_alias") {
sql("select * from contacts")
.select(explode(col("friends.first")), col("friends"))
.createOrReplaceTempView("contact_alias")

val query = sql("select friends.middle, col from contact_alias")
checkScan(query, "struct<friends:array<struct<first:string,middle:string>>>")
checkAnswer(query, Row(Array("Z."), "Susan") :: Nil)
}
}

protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(vectorizedReaderEnabledKey -> "true") {
Expand Down