Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
e0e39d5
[SPARK-34079][SQL] Merging non-correlated scalar subqueries to multi-…
peter-toth Apr 21, 2021
0a7e0e2
no need for the whole plan traversal in this PR
peter-toth Apr 23, 2021
e35cdc1
Merge commit 'fdccd88c2a6dd18c9d446b63fccd5c6188ea125c' into SPARK-34…
peter-toth Apr 23, 2021
0cff7b2
add MULTI_SCALAR_SUBQUERY pattern
peter-toth Apr 23, 2021
22e833d
Merge commit '9af338cd685bce26abbc2dd4d077bde5068157b1' into SPARK-34…
peter-toth Apr 23, 2021
42add09
Merge commit '132cbf0c8c1a382f33d8d212f931f5956f85a2f9' into SPARK-34…
peter-toth Apr 30, 2021
e63111d
add some tests, add more docs
peter-toth Apr 30, 2021
c84f0ee
Merge commit '2634dbac35c5e8d5216b38fd4256f5fd059f341f' into SPARK-34…
peter-toth May 7, 2021
ee8f12a
fix test
peter-toth May 7, 2021
17fd666
rename mergePlans() to tryMergePlans()
peter-toth May 6, 2021
6134fa9
add test to cover aggregate and group expression merge
peter-toth May 7, 2021
2828345
do not merge different aggregate implementations and add test
peter-toth May 6, 2021
1f2f75c
drop MultiScalarSubquery, use ScalarSubquery(CreateStruct()) instead
peter-toth May 7, 2021
a3e84a4
refactor, add support for support filter and join, add new tests, add…
peter-toth May 16, 2021
0fe66dc
minor fixes
peter-toth May 23, 2021
100cb9c
extract common scalar subqueries
peter-toth May 28, 2021
9d8dd6b
Merge commit 'cd2ef9cb43f4e27302906ac2f605df4b66a72a2f' into SPARK-34…
peter-toth May 31, 2021
f83f22b
add and update docs
peter-toth Jun 2, 2021
41c0f0a
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Jun 20, 2021
d10a8be
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Jun 23, 2021
db34640
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Jun 28, 2021
d081885
Clean up code, add more comments
peter-toth Jun 29, 2021
e98754a
temp
peter-toth Jun 29, 2021
bb623cf
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Sep 14, 2021
ae1d84e
fix messages
peter-toth Sep 16, 2021
2eb14f1
regenerate expected plans
peter-toth Sep 16, 2021
060e4b7
add more comments
peter-toth Sep 20, 2021
d86d2c4
use Header alias type
peter-toth Sep 20, 2021
0a97c8b
Merge commit '6e8a4626117f0cb5535875f7181f56350ad4f195' into SPARK-34…
peter-toth Oct 5, 2021
61f2b34
Merge commit '8ae88d01b46d581367d0047b50fcfb65078ab972' into SPARK-34…
peter-toth Nov 9, 2021
532d05e
remove dependecy on `spark.sql.execution.reuseSubquery`, remove unnec…
peter-toth Nov 16, 2021
c488377
refactor
attilapiros Nov 16, 2021
e0a7610
accept Attila's suggestion but keep the `merged` flag, minor name cha…
peter-toth Nov 17, 2021
63c3709
fix review findings
peter-toth Nov 17, 2021
dabbea4
add negative test case to general join matching where only a non-chil…
peter-toth Nov 17, 2021
4d97de5
amend a test to cover extra projects on both sides
peter-toth Nov 17, 2021
cc8690e
improve generic node merging
peter-toth Nov 17, 2021
83c78ca
minor fix
peter-toth Nov 24, 2021
3130913
fix merging logic if merging a plan into a merged plan
peter-toth Nov 28, 2021
3e8f7fa
remove unused mapAttributes()
peter-toth Nov 28, 2021
252c9b1
do not merge nondeterministic plans
peter-toth Dec 6, 2021
fa5e786
fix test and check adaptive path as well
peter-toth Dec 6, 2021
e292732
check test results
peter-toth Dec 6, 2021
963c423
check for same instance of subqueries
peter-toth Dec 6, 2021
9efaf2a
use the new `isCorrelated()`
peter-toth Dec 10, 2021
96a502d
move deterministic check as early as possible
peter-toth Dec 10, 2021
5b91d61
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 2, 2022
8bcf515
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 16, 2022
87ba289
fix comments
peter-toth Mar 18, 2022
6d5a124
rephrase general node merging
peter-toth Mar 18, 2022
851ca29
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 24, 2022
96d0cab
use CTE nodes
peter-toth Mar 23, 2022
a57ed32
no need for extra shuffle with subquery `CTERelationDef`s
peter-toth Mar 24, 2022
0b34d83
regenerate expected plan stability output
peter-toth Mar 24, 2022
4985d43
remove obsolete assert
peter-toth Mar 24, 2022
de9b312
fix LogicalPlanTagInSparkPlanSuite, for logical scan plan trees consi…
peter-toth Mar 24, 2022
13a2fad
fix row-level runtime filtering as after subquery merging bloom filte…
peter-toth Mar 24, 2022
92ce6e5
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 29, 2022
67ffae6
fix header scaladoc
peter-toth Apr 6, 2022
a32a85c
rename subquery flag to mergedScalarSubquery, fix CTERelationRef scal…
peter-toth Apr 6, 2022
1bc8a45
fix test name
peter-toth Apr 6, 2022
224edef
add new testcase "Merge non-correlated scalar subqueries in a subquery"
peter-toth Apr 6, 2022
a7fd1c5
add test "Merge non-correlated scalar subqueries with conflicting names"
peter-toth Apr 6, 2022
a5eb5df
add test "Merging subqueries from different places"
peter-toth Apr 6, 2022
8457148
add test "Do not merge subqueries with different join conditions", fi…
peter-toth Apr 6, 2022
1ff64e4
add test "Do not merge subqueries with different filter conditions"
peter-toth Apr 6, 2022
13a1cdb
simplify do not merge test cases
peter-toth Apr 6, 2022
4da3fe6
drop general node merging code path
peter-toth Apr 6, 2022
96ed6fd
use canonicalized form to in Filter and Join condition comparison
peter-toth Apr 7, 2022
dbe81e2
simplify aggregate check
peter-toth Apr 18, 2022
ba299d5
fix aggregate grouping compare
peter-toth Apr 18, 2022
65f3425
simplify header
peter-toth Apr 18, 2022
dc5e9b9
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Apr 19, 2022
c64373b
rebase on top of https://github.com/apache/spark/pull/34929
peter-toth Apr 19, 2022
3993eab
revert regenerated q5 expected output
peter-toth Apr 19, 2022
f93283d
fix nested subqueries, add test
peter-toth Apr 19, 2022
8c5c9ac
fix comment
peter-toth Apr 19, 2022
3b7ad2c
rename method
peter-toth Apr 19, 2022
c268580
fix test name
peter-toth Apr 19, 2022
169fd6b
fix removeReferences
peter-toth Apr 19, 2022
19128ff
rename merged subquery flag in cte def
peter-toth Apr 20, 2022
1c4d14b
simplify removeReferences, fix tests
peter-toth Apr 20, 2022
2590edf
fix scala 2.13
peter-toth Apr 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.trees.LeafLike
import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY, LIST_SUBQUERY,
PLAN_EXPRESSION, SCALAR_SUBQUERY, TreePattern}
MULTI_SCALAR_SUBQUERY, PLAN_EXPRESSION, SCALAR_SUBQUERY, TreePattern}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet

Expand Down Expand Up @@ -267,6 +268,33 @@ object ScalarSubquery {
}
}

/**
* A subquery that is capable to return multiple scalar values.
*/
case class MultiScalarSubquery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to create a new subquery expression? It seems like we can just use CreateNamedStruct in ScalarSubquery.plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped 'MultiScalarSubquery` in 1f2f75c, will change the docs and the PR description soon.

plan: LogicalPlan,
exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression(plan, Seq.empty, exprId) with LeafLike[Expression] with Unevaluable {
override def dataType: DataType = {
assert(plan.schema.nonEmpty, "Multi-column scalar subquery should have columns")
plan.schema
}

override def nullable: Boolean = true

override def withNewPlan(plan: LogicalPlan): MultiScalarSubquery = copy(plan = plan)

override def toString: String = s"multi-scalar-subquery#${exprId.id}"

override lazy val canonicalized: Expression = {
MultiScalarSubquery(
plan.canonicalized,
ExprId(0))
}

final override def nodePatternsInternal: Seq[TreePattern] = Seq(MULTI_SCALAR_SUBQUERY)
}

/**
* A [[ListQuery]] expression defines the query which we want to search in an IN subquery
* expression. It should and can only be used in conjunction with an IN expression.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{MULTI_SCALAR_SUBQUERY, SCALAR_SUBQUERY}

/**
* This rule tries to merge multiple non-correlated [[ScalarSubquery]]s into a
* [[MultiScalarSubquery]] to compute multiple scalar values once.
*
* The process is the following:
* - While traversing through the plan each [[ScalarSubquery]] plan is tried to merge into the cache
* of already seen subquery plans. If merge is possible then cache is updated with the merged
* subquery plan, if not then the new subquery plan is added to the cache.
* - The original [[ScalarSubquery]] expression is replaced to a reference pointing to its cached
* version in this form: `GetStructField(MultiScalarSubquery(SubqueryReference(...)))`.
* - A second traversal checks if a [[SubqueryReference]] is pointing to a subquery plan that
* returns multiple values and either replaces only [[SubqueryReference]] to the cached plan or
* restores the whole expression to its original [[ScalarSubquery]] form.
* - [[ReuseSubquery]] rule makes sure that merged subqueries are computed once.
*
* Eg. the following query:
*
* SELECT
* (SELECT avg(a) FROM t GROUP BY b),
* (SELECT sum(b) FROM t GROUP BY b)
*
* is optimized from:
*
* Project [scalar-subquery#231 [] AS scalarsubquery()#241,
* scalar-subquery#232 [] AS scalarsubquery()#242L]
* : :- Aggregate [b#234], [avg(a#233) AS avg(a)#236]
* : : +- Relation default.t[a#233,b#234] parquet
* : +- Aggregate [b#240], [sum(b#240) AS sum(b)#238L]
* : +- Project [b#240]
* : +- Relation default.t[a#239,b#240] parquet
Copy link
Contributor

@sigmod sigmod May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to rewrite them into a single scalar subquery using struct, e.g.,

SELECT
(SELECT avg(a) FROM t ),
(SELECT sum(b) FROM t )
FROM R

=>

SELECT st.avg_a, st.sum_b
FROM (
SELECT (SELECT STRUCT(avg(a) AS avg_a, sum(b) AS sum_b) FROM t) AS st
FROM R
)

This way,

  • (1) you don't need to rely on ReuseExchangeAndSubquery;
  • (2) the rewrite can then work for any subqueries, regardless of uncorrelated v.s. correlated, reading from tables v.s. reading from an array column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is smart rewrite way but let's check a more complex example: E.g. can we rewrite

SELECT *
FROM r
JOIN r2 ON r2.x = r.x
WHERE r.y = (SELECT sum(b) FROM t) AND r2.y = (SELECT avg(b) FROM t)

? Maybe

SELECT *
FROM (
  SELECT (
    SELECT STRUCT(sum(b) AS sum_b, avg(b) AS avg_b) FROM t) AS st, x, y 
    FROM r
  ) AS r
)
JOIN r2 ON r2.x = r.x
WHERE r.y = r.st.sum_b AND r2.y = r.st.avg_b

? Does this work with outer joins? And isn't this more complex than the reuse way in this PR?

I was also thinking about "whole plan subquery merge" (similar to my "whole plan reuse" suggestion: #28885) where subqueries at "different level" could be merged (and reused) as a possible improvement to this PR in the future.

BTW, the ReuseExchangeAndSubquery rule you mentioned is suggested in my "whole plan reuse" PR, which got stuck a bit due to lack of reviews. Do you also have a similar rule in production or you just saw my PR? If you have some time, any feedback is appreciated there as well. :)

I didn't check how correlated subqueries could benefit from rewriting the query (this PR focuses on uncorrelated ones), but I think at this point in the optimizer those have been transformed to joins.
Can you please elaborate on the "reading from tables v.s. reading from an array column" part?

Copy link
Contributor

@sigmod sigmod May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And isn't this more complex than the reuse way in this PR?

IIUC, I think this PR's complexity is in extra dependencies. The proposed rule augments two subqueries, makes them look identical, and hopes (a) column-pruning doesn't prune too aggressively and (b) physical de-dup could dedup them. In case (a) changes later and the two aggregate trees are not deduped in the physical plan, there could potentially be regressions -- each aggregation then becomes more expensive.

Therefore, I think there're two directions that you can go:

  • (1) Add a top plan node that contains the main plan tree as well as CommonSubplan trees. For each merged subquery, put it into a CommonSubplan. In a main plan's node, it can reference a CommonSubplan and converts its output relation into a scalar value. This approach only works for uncorrelated cases.
  • (2) Only extract common subqueries within the same logical node, and put merged scalar subqueries into a Project node below the logical node. It may be less ambitious than what you proposed, but it at least does not have extra dependencies. This approach works for both correlated and uncorrelated cases.

Can you please elaborate on the "reading from tables v.s. reading from an array column" part?

This is an example:

SELECT y
FROM LATERAL VIEW explode(ARRAY(ARRAY(1), ARRAY(1, 2), ARRAY(1, 2, 3))) AS y
WHERE
( SELECT COUNT(*) FROM LATERAL VIEW explode(y) AS element ) > 1
AND
( SELECT SUM(element) FROM LATERAL VIEW explode(y) AS element ) > 3

I noticed that such subqueries do not work for now. But they align with the language spec and has well defined semantics. Once we support them, we want your proposed rule to be able to speedup them as well.

? Does this work with outer joins?

I don't think outer joins are relevant to this problem:

  • if we go with (1), it doesn't matter;
  • if we go with (2), pushing projection expressions through join/outer join is separate topic/rule. A subquery expression doesn't have much difference from an ordinary expression in this regard.

Copy link
Contributor Author

@peter-toth peter-toth May 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed rule augments two subqueries, makes them look identical, and hopes (a) column-pruning doesn't prune too aggressively and (b) physical de-dup could dedup them. In case (a) changes later and the two aggregate trees are not deduped in the physical plan, there could potentially be regressions -- each aggregation then becomes more expensive.

In this PR the new MergeScalarSubqueries rule runs in a separate batch after column pruning, close to the end of optimization. This is by design to make sure no subsequent rule changes the structure of different instances of a merged subquery plan at different places in the logical plan differently. So the physical planing creates the same physical plan for these instances and there shouldn't be any dedup issues.
Update: I need to recheck this part as the current PR might not do what I wanted to.

I think probably the downside of my current PR is that the physical planning of merged subqueries happen multiple times (as many times as they they appear in the logical plan) and physical dedup comes only after that. This could be improved if we had subquery references in logical plan as well (something like ReuseSubqueryExec). But I think that's what your (1) is about. Move the merged subqueries to a special top logical plan node and add subquery references at places where they are actually used.

SELECT y
FROM LATERAL VIEW explode(ARRAY(ARRAY(1), ARRAY(1, 2), ARRAY(1, 2, 3))) AS y
WHERE
( SELECT COUNT(*) FROM LATERAL VIEW explode(y) AS element ) > 1
AND
( SELECT SUM(element) FROM LATERAL VIEW explode(y) AS element ) > 3
I noticed that such subqueries do not work for now. But they align with the language spec and has well defined semantics. Once we support them, we want your proposed rule to be able to speedup them as well.

Ah ok, but what should be the optimized plan of that query? This looks like we have 2 correlated subqueries and (2) makes perfect sense to merge them. But I don't think we need lateral views, just take the following query:

SELECT
  (SELECT avg(a) FROM t WHERE t.a = outer.a),
  (SELECT sum(b) FROM t WHERE t.a = outer.a)
FROM t AS outer

which is

Project [scalar-subquery#231 [a#233] AS scalarsubquery(a)#243, scalar-subquery#232 [a#233] AS scalarsubquery(a)#244L]
:  :- Aggregate [avg(a#239) AS avg(a)#236]
:  :  +- Filter (a#239 = outer(a#233))
:  :     +- SubqueryAlias spark_catalog.default.t
:  :        +- Relation default.t[a#239,b#240] parquet
:  +- Aggregate [sum(b#242) AS sum(b)#238L]
:     +- Filter (a#241 = outer(a#233))
:        +- SubqueryAlias spark_catalog.default.t
:           +- Relation default.t[a#241,b#242] parquet
+- SubqueryAlias outer
   +- SubqueryAlias spark_catalog.default.t
      +- Relation default.t[a#233,b#234] parquet

/

Project [avg(a)#236 AS scalarsubquery(a)#243, sum(b)#238L AS scalarsubquery(a)#244L]
+- Join LeftOuter, (a#241 = a#233)
   :- Project [a#233, avg(a)#236]
   :  +- Join LeftOuter, (a#239 = a#233)
   :     :- Project [a#233]
   :     :  +- Relation default.t[a#233,b#234] parquet
   :     +- Aggregate [a#239], [avg(a#239) AS avg(a)#236, a#239]
   :        +- Project [a#239]
   :           +- Filter isnotnull(a#239)
   :              +- Relation default.t[a#239,b#240] parquet
   +- Aggregate [a#241], [sum(b#242) AS sum(b)#238L, a#241]
      +- Filter isnotnull(a#241)
         +- Relation default.t[a#241,b#242] parquet

now, and this PR doesn't help at all, but it could be optimized using your (2).

I wonder the following steps (tickets/PRs) would make sense:

  1. Finish this PR and support only non-correlated subqueries. Mainly focus on merging plans and keep the physical reuse dependency for simplicity. This supports subquery merging within a plan regardless they are in the same logical node.
  2. Add a performance improvement to 1. so as to physical plan a merged subquery only once. This is your (1) basically. Move the merged subqueries to a top node and introduce subquery references in logical plan.
  3. Add support for correlated subqueries using your (2). As you mentioned this will only support subqueries within the same logical node.

Probably we should implement separate rules for 1. + 2. and 3. but the plan merging logic can be common.

Copy link
Contributor

@sigmod sigmod May 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR the new MergeScalarSubqueries rule runs in a separate batch
after column pruning, close to the end of optimization.
This is by design to make sure no subsequent rule changes the structure

I don't see how we can guarantee that. I think the hidden, inter-rule dependency can add complexities for future development and maintenance.

For instance, someone could implement a new Strategy that internally calls ColumnPruning after exploring one logical plan alternative. By the time such a Strategy is implemented, the authors wouldn't be aware of the fact that ColumnPruning should not be called after MergeScalarSubqueries.

  • First of all, such issues can hardly be detected by the author's new unit/query tests, as an effective test has to have both effective patterns to trigger the Strategy and MergeScalarSubqueries. However, such a case can happen in prod traffic;
  • Second, if they do find that's an issue, they would then have to either (a) add some hacks in the Aggregate to mark that MergeScalarSubqueries has been applied and hence ColumnPruning should not go through it, or (b) re-implement MergeScalarSubqueries per my proposal (1).

I'm wondering whether we can pursue (2) for now, if it meets your need. It's less ambitious but may address most of your issue? If you indeed have to extract subqueries over the entire tree, I don't have a clean approach in mind other than (1).

Add a performance improvement to 1. so as to physical plan a merged
subquery only once. This is your (1) basically.

The performance was not my initial concern, but rather, I think we'd better make MergeScalarSubqueries self-contained and does not depend on an assumption that could later be changed.

But I don't think we need lateral views

Sub-querying over arrays are important use cases, for which we don't want to de-correlate. In this case, a subquery is more like an ordinary expression and should be evaluated within the operator node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that mean that (2) also assumes that
no subsequent transformation changes the 2 instances differently and
ReuseSubquery does the dedup?

The difference is that transformation (2) is a self-contained transformation that results in a better plan, while this PR's MergeScalarSubqueries is not. If for some reason ReuseSubquery does not trigger, applying (2) does not make the plan worse than not applying (2). However, iiuc, in the same situation, MergeScalarSubqueries would make the plan worse because each aggregate pipeline runs additional computations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for some reason ReuseSubquery does not trigger, applying (2) does not make the plan worse than not applying (2).

In the above example the final optimized plan of (2) is the very same as with this PR. There are 2 aggregates in both subqueries so without dedup both (2) and this PR could cause regressions.
I agree that (2) is self-contained and this PR is not, but IMO it looks like there are inter-rule dependencies currently in Spark (like PushDownPredicates relies on ReuseSubquery) that overall doesn't make (2) safer than this PR.

I think this means that your (1) suggestion is probably the right approach and we need to move common non-correlated subqueries to a top node and reference to them in logical plan.

I also think that (2) is a good improvement for correlated subqueries, but I would pursue (1) in this PR first and maybe (2) in a separate one. Does this sound acceptable?

@cloud-fan, @maropu do you have any thoughts on this topic?

Copy link
Contributor

@sigmod sigmod May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would pursue (1) in this PR first and maybe (2) in a separate one. Does this sound acceptable?

Yeah, that sounds great. Thanks a lot, @peter-toth!

There are 2 aggregates in both subqueries so without dedup both (2) and this PR could cause regressions.

IIUC, I think it sounds like an existing bug (or missing feature) for struct subfield pruning, which could be blocking (2) but is orthogonal to (2). For instance, if I write your example join query manually, I'd expect the struct subfield pruning to happen to the struct constructor, regardless of the existence of subqueries.

I've never seen such transformations in SparkStrategys.

It's not uncommon in exploration Strategies such as index selection, common subplan dedup etc., when we substitute the subtree of a tree node T with another subtree (from somewhere else in the plan or a different access path) that may contain unneeded columns for T. Spark doesn't have those strategies for now, but I'll not be surprised if some contributors add them down the road.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sigmod. I will try to update the PR by end of this week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I got a bit distracted. But updated the PR to follow your (1) suggestion now. Please let me know your thoughts.

* +- OneRowRelation
*
* to:
*
* Project [multi-scalar-subquery#231.avg(a) AS scalarsubquery()#241,
* multi-scalar-subquery#232.sum(b) AS scalarsubquery()#242L]
* : :- Aggregate [b#234], [avg(a#233) AS avg(a)#236, sum(b#234) AS sum(b)#238L]
* : : +- Project [a#233, b#234]
* : : +- Relation default.t[a#233,b#234] parquet
* : +- Aggregate [b#234], [avg(a#233) AS avg(a)#236, sum(b#234) AS sum(b)#238L]
* : +- Project [a#233, b#234]
* : +- Relation default.t[a#233,b#234] parquet
* +- OneRowRelation
*/
object MergeScalarSubqueries extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.scalarSubqueryMergeEabled && conf.subqueryReuseEnabled) {
val mergedSubqueries = ArrayBuffer.empty[LogicalPlan]
removeReferences(mergeAndInsertReferences(plan, mergedSubqueries), mergedSubqueries)
} else {
plan
}
}

private def mergeAndInsertReferences(
plan: LogicalPlan,
mergedSubqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
plan.transformAllExpressionsWithPruning(_.containsAnyPattern(SCALAR_SUBQUERY), ruleId) {
case s: ScalarSubquery if s.children.isEmpty =>
val (mergedPlan, ordinal) = mergeAndGetReference(s.plan, mergedSubqueries)
GetStructField(MultiScalarSubquery(mergedPlan, s.exprId), ordinal)
}
}

case class SubqueryReference(
index: Int,
mergedSubqueries: ArrayBuffer[LogicalPlan]) extends LeafNode {
override def stringArgs: Iterator[Any] = Iterator(index)

override def output: Seq[Attribute] = mergedSubqueries(index).output
}

private def mergeAndGetReference(
plan: LogicalPlan,
mergedSubqueries: ArrayBuffer[LogicalPlan]): (SubqueryReference, Int) = {
mergedSubqueries.zipWithIndex.collectFirst {
Function.unlift { case (s, i) => mergePlans(plan, s).map(_ -> i) }
}.map { case ((mergedPlan, outputMap), i) =>
mergedSubqueries(i) = mergedPlan
SubqueryReference(i, mergedSubqueries) ->
mergedPlan.output.indexOf(outputMap(plan.output.head))
}.getOrElse {
mergedSubqueries += plan
SubqueryReference(mergedSubqueries.length - 1, mergedSubqueries) -> 0
}
}

private def mergePlans(
newPlan: LogicalPlan,
existingPlan: LogicalPlan): Option[(LogicalPlan, AttributeMap[Attribute])] = {
(newPlan, existingPlan) match {
case (np, ep) if np.canonicalized == ep.canonicalized =>
Some(ep -> AttributeMap(np.output.zip(ep.output)))
case (np: Project, ep: Project) =>
mergePlans(np.child, ep.child).map { case (mergedChild, outputMap) =>
val newProjectList = replaceAttributes(np.projectList, outputMap)
val newOutputMap = createOutputMap(np.projectList, newProjectList)
Project(distinctExpressions(ep.projectList ++ newProjectList), mergedChild) ->
newOutputMap
}
case (np, ep: Project) =>
mergePlans(np, ep.child).map { case (mergedChild, outputMap) =>
Project(distinctExpressions(ep.projectList ++ outputMap.values), mergedChild) -> outputMap
}
case (np: Project, ep) =>
mergePlans(np.child, ep).map { case (mergedChild, outputMap) =>
val newProjectList = replaceAttributes(np.projectList, outputMap)
val newOutputMap = createOutputMap(np.projectList, newProjectList)
Project(distinctExpressions(ep.output ++ newProjectList), mergedChild) -> newOutputMap
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which kind of queries does this case handle? (this PR already has any test for this code path?) Is it safe to accept any plan node if the other side is Project?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, could we use a white-list here to check if it can merge plans or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good example is in the description of this PR:

SELECT
  (SELECT avg(a) FROM t GROUP BY b),
  (SELECT sum(b) FROM t GROUP BY b)

Where we have an additional Project [b#240] in the second subquery due to column pruning:

Project [scalar-subquery#231 [] AS scalarsubquery()#241, scalar-subquery#232 [] AS scalarsubquery()#242L]
:  :- Aggregate [b#234], [avg(a#233) AS avg(a)#236]
:  :  +- Relation default.t[a#233,b#234] parquet
:  +- Aggregate [b#240], [sum(b#240) AS sum(b)#238L]
:     +- Project [b#240]
:        +- Relation default.t[a#239,b#240] parquet
+- OneRowRelation

and this rule can merge the 2 queries as:

Project [multi-scalar-subquery#231.avg(a) AS scalarsubquery()#241, multi-scalar-subquery#232.sum(b) AS scalarsubquery()#242L]
:  :- Aggregate [b#234], [avg(a#233) AS avg(a)#236, sum(b#234) AS sum(b)#238L]
:  :  +- Project [a#233, b#234]
:  :     +- Relation default.t[a#233,b#234] parquet
:  +- Aggregate [b#234], [avg(a#233) AS avg(a)#236, sum(b#234) AS sum(b)#238L]
:     +- Project [a#233, b#234]
:        +- Relation default.t[a#233,b#234] parquet
+- OneRowRelation

IMO the above 2 cases are safe and just handle the cases when there is an extra Project node in one of the plans but the child plan under the Project and the plan of the other side are mergeable. In these cases the merged plan should contain the Project node but it should also contain the output of the other side transparently.

I will add this test case to SubquerySuite.

case (np: Aggregate, ep: Aggregate) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always assume merging two (or more) aggregates makes performance better? For example, we have two aggregates in a plan, one side is a hash-aggregate and the other side is an object hash-aggregate. In this case, the merged plan node seems to be an object-hash aggregate. If this is true, this rewrite can easily cause high memory pressure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, since this rewrite itself is not an optimization, but a pre-process to reuse sub-queries, so it might be better to implement this logic inside the ReuseSubquery side, and merge them if physical plans are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always assume merging two (or more) aggregates makes performance better? For example, we have two aggregates in a plan, one side is a hash-aggregate and the other side is an object hash-aggregate. In this case, the merged plan node seems to be an object-hash aggregate. If this is true, this rewrite can easily cause high memory pressure.

Thanks, this is a very good question, let me look into this...

IMHO, since this rewrite itself is not an optimization, but a pre-process to reuse sub-queries, so it might be better to implement this logic inside the ReuseSubquery side, and merge them if physical plans are the same.

The reason why I implemented this feature as an Optimizer rule is that merging LogicalPlans seems much easier than merging physical ones. The example in the description has the following physical subquery plans:

*(1) Project [Subquery scalar-subquery#231, [id=#110] AS scalarsubquery()#241, Subquery scalar-subquery#232, [id=#132] AS scalarsubquery()#242L]
:  :- Subquery scalar-subquery#231, [id=#110]
:  :  +- *(2) HashAggregate(keys=[b#234], functions=[avg(a#233)], output=[avg(a)#236])
:  :     +- Exchange hashpartitioning(b#234, 5), ENSURE_REQUIREMENTS, [id=#106]
:  :        +- *(1) HashAggregate(keys=[b#234], functions=[partial_avg(a#233)], output=[b#234, sum#247, count#248L])
:  :           +- *(1) ColumnarToRow
:  :              +- FileScan parquet default.t[a#233,b#234] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/petertoth/git/apache/spark/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:int>
:  +- Subquery scalar-subquery#232, [id=#132]
:     +- *(2) HashAggregate(keys=[b#240], functions=[sum(b#240)], output=[sum(b)#238L])
:        +- Exchange hashpartitioning(b#240, 5), ENSURE_REQUIREMENTS, [id=#128]
:           +- *(1) HashAggregate(keys=[b#240], functions=[partial_sum(b#240)], output=[b#240, sum#250L])
:              +- *(1) ColumnarToRow
:                 +- FileScan parquet default.t[b#240] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/petertoth/git/apache/spark/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<b:int>

Merging these 2 physical subqueries would require much more complex mergePlans() function that can handle Exchange and Scan nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the hash/objecthash/sorted aggregate merge issue in 2828345 and I also added a test.

mergePlans(np.child, ep.child).flatMap { case (mergedChild, outputMap) =>
val newGroupingExpression = replaceAttributes(np.groupingExpressions, outputMap)
if (ExpressionSet(newGroupingExpression) == ExpressionSet(ep.groupingExpressions)) {
val newAggregateExpressions = replaceAttributes(np.aggregateExpressions, outputMap)
val newOutputMap = createOutputMap(np.aggregateExpressions, newAggregateExpressions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the tests added in this PR, it seems this map always has the same exprId mapping, e.g., sum(a)#3 -> sum(a)#3. So, could you add more tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added a new test in 6134fa9 to cover this.

Some(Aggregate(ep.groupingExpressions,
distinctExpressions(ep.aggregateExpressions ++ newAggregateExpressions),
mergedChild) -> newOutputMap)
} else {
None
}
}
case _ =>
None
}
}

private def replaceAttributes[T <: Expression](
expressions: Seq[T],
outputMap: AttributeMap[Attribute]) = {
expressions.map(_.transform {
case a: Attribute => outputMap.getOrElse(a, a)
}.asInstanceOf[T])
}

private def createOutputMap(from: Seq[NamedExpression], to: Seq[NamedExpression]) = {
AttributeMap(from.map(_.toAttribute).zip(to.map(_.toAttribute)))
}

private def distinctExpressions(expressions: Seq[NamedExpression]) = {
ExpressionSet(expressions).toSeq.asInstanceOf[Seq[NamedExpression]]
}

private def removeReferences(
plan: LogicalPlan,
mergedSubqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
plan.transformAllExpressionsWithPruning(_.containsAnyPattern(MULTI_SCALAR_SUBQUERY), ruleId) {
case gsf @ GetStructField(mss @ MultiScalarSubquery(sr: SubqueryReference, _), _, _) =>
val dereferencedPlan = removeReferences(mergedSubqueries(sr.index), mergedSubqueries)
if (dereferencedPlan.outputSet.size > 1) {
gsf.copy(child = mss.copy(plan = dereferencedPlan))
} else {
ScalarSubquery(dereferencedPlan, exprId = mss.exprId)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
ColumnPruning,
CollapseProject,
RemoveNoopOperators) :+
Batch("MergeScalarSubqueries", Once,
MergeScalarSubqueries) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
Batch("ReplaceUpdateFieldsExpression", Once, ReplaceUpdateFieldsExpression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.optimizer.NullPropagation" ::
"org.apache.spark.sql.catalyst.optimizer.OptimizeIn" ::
"org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries" ::
"org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries" ::
"org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin" ::
"org.apache.spark.sql.catalyst.optimizer.PushExtraPredicateThroughJoin" ::
"org.apache.spark.sql.catalyst.optimizer.PushFoldableIntoBranches" ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object TreePattern extends Enumeration {
val LIKE_FAMLIY: Value = Value
val LIST_SUBQUERY: Value = Value
val LITERAL: Value = Value
val MULTI_SCALAR_SUBQUERY: Value = Value
val NOT: Value = Value
val NULL_CHECK: Value = Value
val NULL_LITERAL: Value = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val SCALAR_SUBQUERY_MERGE_ENABLED =
buildConf("spark.sql.scalarSubqueyMerge.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.optimizer.scalarSubqueyMerging.enabled (or spark.sql.optimizer.mergeScalarSubqueries.enabled) instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to describe more, e.g., To enable this feature,spark.sql.execution.reuseSubquery needs to be true, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no longer sure we need this flag actually as we can disable this rule from optimizer with spark.sql.optimizer.excludedRules too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't removed this flag yet, but I still think it is not needed.

.internal()
.doc("When true, the planner will try to merge scalar subqueries and re-use them.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)

val REMOVE_REDUNDANT_PROJECTS_ENABLED = buildConf("spark.sql.execution.removeRedundantProjects")
.internal()
.doc("Whether to remove redundant project exec node based on children's output and " +
Expand Down Expand Up @@ -3481,6 +3489,8 @@ class SQLConf extends Serializable with Logging {

def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)

def scalarSubqueryMergeEabled: Boolean = getConf(SCALAR_SUBQUERY_MERGE_ENABLED)

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{GetStructField, MultiScalarSubquery, ScalarSubquery}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class MergeScalarSubqueriesSuite extends PlanTest {

private object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("MergeScalarSubqueries", Once, MergeScalarSubqueries) :: Nil
}

test("Simple non-correlated scalar subquery merge") {
val testRelation = LocalRelation('a.int, 'b.int)

val subquery1 = testRelation
.groupBy('b)(max('a))
val subquery2 = testRelation
.groupBy('b)(sum('a))
val originalQuery = testRelation
.select(ScalarSubquery(subquery1), ScalarSubquery(subquery2))

val multiSubquery = testRelation
.groupBy('b)(max('a), sum('a)).analyze
val correctAnswer = testRelation
.select(GetStructField(MultiScalarSubquery(multiSubquery), 0).as("scalarsubquery()"),
GetStructField(MultiScalarSubquery(multiSubquery), 1).as("scalarsubquery()"))

// checkAnalysis is disabled because `Analizer` is not prepared for `MultiScalarSubquery` nodes
// as only `Optimizer` can insert such a node to the plan
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ trait PlanTestBase extends PredicateHelper with SQLHelper with SQLConfHelper { s
plan transformAllExpressions {
case s: ScalarSubquery =>
s.copy(plan = normalizeExprIds(s.plan), exprId = ExprId(0))
case s: MultiScalarSubquery =>
s.copy(plan = normalizeExprIds(s.plan), exprId = ExprId(0))
case e: Exists =>
e.copy(exprId = ExprId(0))
case l: ListQuery =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{ListQuery, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, SCALAR_SUBQUERY}
import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY,
MULTI_SCALAR_SUBQUERY, SCALAR_SUBQUERY}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
Expand Down Expand Up @@ -114,7 +115,8 @@ case class InsertAdaptiveSparkPlan(
*/
private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] = {
val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec]
if (!plan.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) {
if (!plan.containsAnyPattern(SCALAR_SUBQUERY, MULTI_SCALAR_SUBQUERY, IN_SUBQUERY,
DYNAMIC_PRUNING_SUBQUERY)) {
return subqueryMap.toMap
}
plan.foreach(_.expressions.foreach(_.foreach {
Expand All @@ -125,6 +127,13 @@ case class InsertAdaptiveSparkPlan(
val subquery = SubqueryExec.createForScalarSubquery(
s"subquery#${exprId.id}", executedPlan)
subqueryMap.put(exprId.id, subquery)
case expressions.MultiScalarSubquery(p, exprId)
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(p)
verifyAdaptivePlan(executedPlan, p)
val subquery = SubqueryExec.createForScalarSubquery(
s"subquery#${exprId.id}", executedPlan)
subqueryMap.put(exprId.id, subquery)
case expressions.InSubquery(_, ListQuery(query, _, exprId, _))
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, DynamicPruningExpression, ListQuery, Literal}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY,
SCALAR_SUBQUERY}
MULTI_SCALAR_SUBQUERY, SCALAR_SUBQUERY}
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.{BaseSubqueryExec, InSubqueryExec, SparkPlan}

Expand All @@ -30,9 +30,12 @@ case class PlanAdaptiveSubqueries(

def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressionsWithPruning(
_.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) {
_.containsAnyPattern(SCALAR_SUBQUERY, MULTI_SCALAR_SUBQUERY, IN_SUBQUERY,
DYNAMIC_PRUNING_SUBQUERY)) {
case expressions.ScalarSubquery(_, _, exprId) =>
execution.ScalarSubquery(subqueryMap(exprId.id), exprId)
case expressions.MultiScalarSubquery(_, exprId) =>
execution.MultiScalarSubqueryExec(subqueryMap(exprId.id), exprId)
case expressions.InSubquery(values, ListQuery(_, _, exprId, _)) =>
val expr = if (values.length == 1) {
values.head
Expand Down
Loading