-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account #37525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1f5341e
5fe4fa6
b693b31
5e8b084
7a57a29
0138ad4
cf75540
3f84edd
da1d48a
7747045
e6c395e
733ecb5
1f1f093
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.plans | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.sql.catalyst.SQLConfHelper | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| /** | ||
| * A trait that provides functionality to handle aliases in the `outputExpressions`. | ||
| */ | ||
| trait AliasAwareOutputExpression extends SQLConfHelper { | ||
| protected val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) | ||
| protected def outputExpressions: Seq[NamedExpression] | ||
| /** | ||
| * This method can be used to strip expression which does not affect the result, for example: | ||
| * strip the expression which is ordering agnostic for output ordering. | ||
| */ | ||
| protected def strip(expr: Expression): Expression = expr | ||
|
|
||
| // Build an `Expression` -> `Attribute` alias map. | ||
| // There can be multiple alias defined for the same expressions but it doesn't make sense to store | ||
| // more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic | ||
| // handled only the last alias so we need to make sure that we give precedence to that. | ||
| // If the `outputExpressions` contain simple attributes we need to add those too to the map. | ||
| private lazy val aliasMap = { | ||
| val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() | ||
| outputExpressions.reverse.foreach { | ||
| case a @ Alias(child, _) => | ||
| val buffer = aliases.getOrElseUpdate(strip(child).canonicalized, mutable.ArrayBuffer.empty) | ||
| if (buffer.size < aliasCandidateLimit) { | ||
| buffer += a.toAttribute | ||
| } | ||
| case _ => | ||
| } | ||
| outputExpressions.foreach { | ||
| case a: Attribute if aliases.contains(a.canonicalized) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this do ? do you mean
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we have
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the behavior of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case doesn't match I think your code would add all attributes, but that is not needed.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, I see it. how about this case:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. If
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But the aliasMap is not empty due to c2 as x. for this case, how can we preserve c1 if c1 does not add into aliasMap ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I see it, nvm. @peter-toth Thank you for the patience !
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Np, thanks for reviewing this PR @ulysses-you! |
||
| val buffer = aliases(a.canonicalized) | ||
| if (buffer.size < aliasCandidateLimit) { | ||
| buffer += a | ||
| } | ||
| case _ => | ||
| } | ||
| aliases | ||
| } | ||
|
|
||
| protected def hasAlias: Boolean = aliasMap.nonEmpty | ||
|
|
||
| /** | ||
| * Return a stream of expressions in which the original expression is projected with `aliasMap`. | ||
| */ | ||
| protected def projectExpression(expr: Expression): Stream[Expression] = { | ||
| val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) | ||
| expr.multiTransformDown { | ||
| // Mapping with aliases | ||
| case e: Expression if aliasMap.contains(e.canonicalized) => | ||
| aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty) | ||
|
|
||
| // Prune if we encounter an attribute that we can't map and it is not in output set. | ||
| // This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty` | ||
| // there. | ||
| case a: Attribute if !outputSet.contains(a) => Seq.empty | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that | ||
| * satisfies ordering requirements. | ||
| */ | ||
| trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] | ||
| extends AliasAwareOutputExpression { self: QueryPlan[T] => | ||
| protected def orderingExpressions: Seq[SortOrder] | ||
|
|
||
| override protected def strip(expr: Expression): Expression = expr match { | ||
| case e: Empty2Null => strip(e.child) | ||
| case _ => expr | ||
| } | ||
|
|
||
| override final def outputOrdering: Seq[SortOrder] = { | ||
| if (hasAlias) { | ||
| // Take the first `SortOrder`s only until they can be projected. | ||
| // E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then | ||
| // if only `a AS x` can be projected then we can return Seq(SortOrder(x))` | ||
| // but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`. | ||
| orderingExpressions.iterator.map { sortOrder => | ||
| val orderingSet = mutable.Set.empty[Expression] | ||
| val sameOrderings = sortOrder.children.toStream | ||
| .flatMap(projectExpression) | ||
| .filter(e => orderingSet.add(e.canonicalized)) | ||
| .take(aliasCandidateLimit) | ||
| if (sameOrderings.nonEmpty) { | ||
| Some(sortOrder.copy(child = sameOrderings.head, | ||
| sameOrderExpressions = sameOrderings.tail)) | ||
| } else { | ||
| None | ||
| } | ||
| }.takeWhile(_.isDefined).flatten.toSeq | ||
| } else { | ||
| orderingExpressions | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -435,6 +435,16 @@ object SQLConf { | |
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val EXPRESSION_PROJECTION_CANDIDATE_LIMIT = | ||
| buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit") | ||
| .doc("The maximum number of the candidate of output expressions whose alias are replaced." + | ||
| " It can preserve the output partitioning and ordering." + | ||
| " Negative value means disable this optimization.") | ||
| .internal() | ||
| .version("3.4.0") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR targets master, which is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to merge it to 3.4 as it fixes a bug in planned write, which is a new feature in 3.4. |
||
| .intConf | ||
peter-toth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .createWithDefault(100) | ||
|
|
||
| val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") | ||
| .doc("When set to true Spark SQL will automatically select a compression codec for each " + | ||
| "column based on statistics of the data.") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,52 +16,42 @@ | |||||||||||||||
| */ | ||||||||||||||||
| package org.apache.spark.sql.execution | ||||||||||||||||
|
|
||||||||||||||||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder} | ||||||||||||||||
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} | ||||||||||||||||
| import scala.collection.mutable | ||||||||||||||||
|
|
||||||||||||||||
| /** | ||||||||||||||||
| * A trait that provides functionality to handle aliases in the `outputExpressions`. | ||||||||||||||||
| */ | ||||||||||||||||
| trait AliasAwareOutputExpression extends UnaryExecNode { | ||||||||||||||||
| protected def outputExpressions: Seq[NamedExpression] | ||||||||||||||||
|
|
||||||||||||||||
| private lazy val aliasMap = outputExpressions.collect { | ||||||||||||||||
| case a @ Alias(child, _) => child.canonicalized -> a.toAttribute | ||||||||||||||||
| }.toMap | ||||||||||||||||
|
|
||||||||||||||||
| protected def hasAlias: Boolean = aliasMap.nonEmpty | ||||||||||||||||
|
|
||||||||||||||||
| protected def normalizeExpression(exp: Expression): Expression = { | ||||||||||||||||
| exp.transformDown { | ||||||||||||||||
| case e: Expression => aliasMap.getOrElse(e.canonicalized, e) | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| import org.apache.spark.sql.catalyst.expressions.Expression | ||||||||||||||||
| import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} | ||||||||||||||||
| import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} | ||||||||||||||||
|
|
||||||||||||||||
| /** | ||||||||||||||||
| * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that | ||||||||||||||||
| * satisfies distribution requirements. | ||||||||||||||||
| */ | ||||||||||||||||
| trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { | ||||||||||||||||
| trait PartitioningPreservingUnaryExecNode extends UnaryExecNode | ||||||||||||||||
| with AliasAwareOutputExpression { | ||||||||||||||||
| final override def outputPartitioning: Partitioning = { | ||||||||||||||||
| val normalizedOutputPartitioning = if (hasAlias) { | ||||||||||||||||
| child.outputPartitioning match { | ||||||||||||||||
| if (hasAlias) { | ||||||||||||||||
| flattenPartitioning(child.outputPartitioning).flatMap { | ||||||||||||||||
| case e: Expression => | ||||||||||||||||
| normalizeExpression(e).asInstanceOf[Partitioning] | ||||||||||||||||
| case other => other | ||||||||||||||||
| // We need unique partitionings but if the input partitioning is | ||||||||||||||||
| // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after | ||||||||||||||||
| // the projection we have 4 partitionings: | ||||||||||||||||
| // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, | ||||||||||||||||
| // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but | ||||||||||||||||
| // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. | ||||||||||||||||
| val partitioningSet = mutable.Set.empty[Expression] | ||||||||||||||||
| projectExpression(e) | ||||||||||||||||
| .filter(e => partitioningSet.add(e.canonicalized)) | ||||||||||||||||
| .take(aliasCandidateLimit) | ||||||||||||||||
|
Comment on lines
+41
to
+44
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scala 2.13 allows to simplify this. Its a shame...
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm I think we still need to support scala 2.12 for now?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, that's the shame bit |
||||||||||||||||
| .asInstanceOf[Stream[Partitioning]] | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to cast to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This cast is required to avoid a compile error as |
||||||||||||||||
| case o => Seq(o) | ||||||||||||||||
| } match { | ||||||||||||||||
| case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) | ||||||||||||||||
| case Seq(p) => p | ||||||||||||||||
| case ps => PartitioningCollection(ps) | ||||||||||||||||
| } | ||||||||||||||||
| } else { | ||||||||||||||||
| child.outputPartitioning | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| flattenPartitioning(normalizedOutputPartitioning).filter { | ||||||||||||||||
| case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet) | ||||||||||||||||
| case _ => true | ||||||||||||||||
| } match { | ||||||||||||||||
| case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) | ||||||||||||||||
| case Seq(singlePartitioning) => singlePartitioning | ||||||||||||||||
| case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings) | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = { | ||||||||||||||||
|
|
@@ -74,18 +64,5 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { | |||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** | ||||||||||||||||
| * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that | ||||||||||||||||
| * satisfies ordering requirements. | ||||||||||||||||
| */ | ||||||||||||||||
| trait AliasAwareOutputOrdering extends AliasAwareOutputExpression { | ||||||||||||||||
| protected def orderingExpressions: Seq[SortOrder] | ||||||||||||||||
|
|
||||||||||||||||
| final override def outputOrdering: Seq[SortOrder] = { | ||||||||||||||||
| if (hasAlias) { | ||||||||||||||||
| orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder]) | ||||||||||||||||
| } else { | ||||||||||||||||
| orderingExpressions | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| trait OrderPreservingUnaryExecNode | ||||||||||||||||
| extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan] | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we limit the size of this map as well? Now it only limits the size of each map value.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should as it might happen that we reach the map size limit but the added aliases don't project the child's partitioning / ordering. E.g. if child partitioning is
HashPartitioning(c)and the projection isc1 as c1a, ..., c100 as c100a, c as cathenc as cais not added to the map so we would end up withUnknownPartitioning.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current config acutally limit the size of final preserved exprs. I think we should add one more config to limit the candidates size in
aliasMapand it can be bigger by default. ThisaliasMapmay harm driver memory for wide tables.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok to add a new config but
aliasMapwill never be bigger than the projection (outputExpressions) so is this a real concern?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave it for now. Having less
Attributein map values may generate fewer alternatives, but having less map entries may stop the entire projection.