Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ object NestedColumnAliasing {
*
* 1. ExtractValue -> Alias: A new alias is created for each nested field.
* 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it.
*
* @param exprList a sequence of expressions that possibly access nested fields.
* @param skipAttrs a set of attributes we do not want to replace nested fields within.
*/
def getAliasSubMap(exprList: Seq[Expression])
def getAliasSubMap(exprList: Seq[Expression], skipAttrs: AttributeSet = AttributeSet.empty)
: Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = {
val (nestedFieldReferences, otherRootReferences) =
exprList.flatMap(collectRootReferenceAndExtractValue).partition {
Expand All @@ -116,7 +119,10 @@ object NestedColumnAliasing {
}

val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
.filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
.filter { nestedRef =>
!nestedRef.references.subsetOf(AttributeSet(otherRootReferences)) &&
!nestedRef.references.subsetOf(skipAttrs)
}
.groupBy(_.references.head)
.flatMap { case (attr, nestedFields: Seq[ExtractValue]) =>
// Each expression can contain multiple nested fields.
Expand Down Expand Up @@ -179,7 +185,10 @@ object GeneratorNestedColumnAliasing {

case g: Generate if SQLConf.get.nestedSchemaPruningEnabled &&
canPruneGenerator(g.generator) =>
NestedColumnAliasing.getAliasSubMap(g.generator.children).map {
// For the child outputs required by the operator on top of `Generate`, we do not want
// to prune it.
val requiredAttrs = AttributeSet(g.requiredChildOutput)
NestedColumnAliasing.getAliasSubMap(g.generator.children, requiredAttrs).map {
Comment on lines +190 to +191
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case normally should be treated by above case pattern (Project + Generate). But if all nested fields are selected at top Project, the above case won't prune. Then when Optimizer transforms down to the underlying Generate, only the referred nested column are kept and others are pruned from the child. It causes the accessors at top Project unresolved.

case (nestedFieldToAlias, attrToAliases) =>
pruneGenerate(g, nestedFieldToAlias, attrToAliases)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,14 @@ abstract class SchemaPruningSuite
}
}

testSchemaPruning("select explode of nested field of array of struct and " +
"all remaining nested fields") {
Copy link
Member

@gatorsmile gatorsmile Feb 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of fixing case by case, can we try to find all the possible cases and ensure we can cover all the possible query plans? Includes negative and positive cases.

Also, we need to have the unit test cases for these optimizer rules.

val query = spark.table("contacts")
.select(explode(col("friends.first")), col("friends.middle"), col("friends.last"))
checkScan(query, "struct<friends:array<struct<first:string,middle:string,last:string>>>")
checkAnswer(query, Row("Susan", Array("Z."), Array("Smith")) :: Nil)
}

protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(vectorizedReaderEnabledKey -> "true") {
Expand Down