Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,53 @@ object NestedColumnAliasing {
case MapType(keyType, valueType, _) => totalFieldNum(keyType) + totalFieldNum(valueType)
case _ => 1 // UDT and others
}
}

/**
* This prunes unnessary nested columns from `Generate` and optional `Project` on top
* of it.
*/
object GeneratorNestedColumnAliasing {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
// Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we
// need to prune nested columns through Project and under Generate. The difference is
// when `nestedSchemaPruningEnabled` is on, nested columns will be pruned further at
// file format readers if it is supported.
case Project(projectList, g: Generate) if (SQLConf.get.nestedPruningOnExpressions ||
SQLConf.get.nestedSchemaPruningEnabled) && canPruneGenerator(g.generator) =>
// On top on `Generate`, a `Project` that might have nested column accessors.
// We try to get alias maps for both project list and generator's children expressions.
NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map {
case (nestedFieldToAlias, attrToAliases) =>
val newChild = pruneGenerate(g, nestedFieldToAlias, attrToAliases)
Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild)
}

case g: Generate if SQLConf.get.nestedSchemaPruningEnabled &&
canPruneGenerator(g.generator) =>
NestedColumnAliasing.getAliasSubMap(g.generator.children).map {
case (nestedFieldToAlias, attrToAliases) =>
pruneGenerate(g, nestedFieldToAlias, attrToAliases)
}

case _ =>
None
}

private def pruneGenerate(
g: Generate,
nestedFieldToAlias: Map[ExtractValue, Alias],
attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
val newGenerator = g.generator.transform {
case f: ExtractValue if nestedFieldToAlias.contains(f) =>
nestedFieldToAlias(f).toAttribute
}.asInstanceOf[Generator]

// Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`.
val newGenerate = g.copy(generator = newGenerator)

NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases)
}

/**
* This is a while-list for pruning nested fields at `Generator`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,31 +598,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
s.copy(child = prunedChild(child, s.references))

// prune unrequired references
case p @ Project(_, g: Generate) if p.references != g.outputSet =>
val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
val newChild = prunedChild(g.child, requiredAttrs)
val unrequired = g.generator.references -- p.references
val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
.map(_._2)
p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))

// prune unrequired nested fields
case p @ Project(projectList, g: Generate) if SQLConf.get.nestedPruningOnExpressions &&
NestedColumnAliasing.canPruneGenerator(g.generator) =>
NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map {
case (nestedFieldToAlias, attrToAliases) =>
val newGenerator = g.generator.transform {
case f: ExtractValue if nestedFieldToAlias.contains(f) =>
nestedFieldToAlias(f).toAttribute
}.asInstanceOf[Generator]

// Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`.
val newGenerate = g.copy(generator = newGenerator)

val newChild = NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases)

Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild)
}.getOrElse(p)
case p @ Project(_, g: Generate) =>
val currP = if (p.references != g.outputSet) {
val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
val newChild = prunedChild(g.child, requiredAttrs)
val unrequired = g.generator.references -- p.references
val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
.map(_._2)
p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
} else {
p
}
// If we can prune nested column on Project + Generate, do it now.
// Otherwise by transforming down to Generate, it could be pruned individually,
// and causes nested column on top Project unable to resolve.
GeneratorNestedColumnAliasing.unapply(currP).getOrElse(currP)

// prune unrequired nested fields from `Generate`.
case GeneratorNestedColumnAliasing(p) => p

// Eliminate unneeded attributes from right side of a Left Existence Join.
case j @ Join(_, right, LeftExistence(_), _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,38 @@ abstract class SchemaPruningSuite
checkAnswer(query, Row("Y.", 1) :: Row("X.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
}

testSchemaPruning("select explode of nested field of array of struct") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason why we did not capture the bug is our tests are not well designed and reviewed.

We have to be super careful when we review the tests and then it will be much easier to find the bugs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it and pinging me. Let me look at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #27503 to fix it.

// Config combinations
val configs = Seq((true, true), (true, false), (false, true), (false, false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


configs.foreach { case (nestedPruning, nestedPruningOnExpr) =>
withSQLConf(
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedPruning.toString,
SQLConf.NESTED_PRUNING_ON_EXPRESSIONS.key -> nestedPruningOnExpr.toString) {
val query1 = spark.table("contacts")
.select(explode(col("friends.first")))
if (nestedPruning) {
// If `NESTED_SCHEMA_PRUNING_ENABLED` is enabled,
// even disabling `NESTED_PRUNING_ON_EXPRESSIONS`,
// nested schema is still pruned at scan node.
checkScan(query1, "struct<friends:array<struct<first:string>>>")
} else {
checkScan(query1, "struct<friends:array<struct<first:string,middle:string,last:string>>>")
}
checkAnswer(query1, Row("Susan") :: Nil)

val query2 = spark.table("contacts")
.select(explode(col("friends.first")), col("friends.middle"))
if (nestedPruning) {
checkScan(query2, "struct<friends:array<struct<first:string,middle:string>>>")
} else {
checkScan(query2, "struct<friends:array<struct<first:string,middle:string,last:string>>>")
}
checkAnswer(query2, Row("Susan", Array("Z.")) :: Nil)
}
}
}

protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(vectorizedReaderEnabledKey -> "true") {
Expand Down