Skip to content
Closed
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
e0e39d5
[SPARK-34079][SQL] Merging non-correlated scalar subqueries to multi-…
peter-toth Apr 21, 2021
0a7e0e2
no need for the whole plan traversal in this PR
peter-toth Apr 23, 2021
e35cdc1
Merge commit 'fdccd88c2a6dd18c9d446b63fccd5c6188ea125c' into SPARK-34…
peter-toth Apr 23, 2021
0cff7b2
add MULTI_SCALAR_SUBQUERY pattern
peter-toth Apr 23, 2021
22e833d
Merge commit '9af338cd685bce26abbc2dd4d077bde5068157b1' into SPARK-34…
peter-toth Apr 23, 2021
42add09
Merge commit '132cbf0c8c1a382f33d8d212f931f5956f85a2f9' into SPARK-34…
peter-toth Apr 30, 2021
e63111d
add some tests, add more docs
peter-toth Apr 30, 2021
c84f0ee
Merge commit '2634dbac35c5e8d5216b38fd4256f5fd059f341f' into SPARK-34…
peter-toth May 7, 2021
ee8f12a
fix test
peter-toth May 7, 2021
17fd666
rename mergePlans() to tryMergePlans()
peter-toth May 6, 2021
6134fa9
add test to cover aggregate and group expression merge
peter-toth May 7, 2021
2828345
do not merge different aggregate implementations and add test
peter-toth May 6, 2021
1f2f75c
drop MultiScalarSubquery, use ScalarSubquery(CreateStruct()) instead
peter-toth May 7, 2021
a3e84a4
refactor, add support for support filter and join, add new tests, add…
peter-toth May 16, 2021
0fe66dc
minor fixes
peter-toth May 23, 2021
100cb9c
extract common scalar subqueries
peter-toth May 28, 2021
9d8dd6b
Merge commit 'cd2ef9cb43f4e27302906ac2f605df4b66a72a2f' into SPARK-34…
peter-toth May 31, 2021
f83f22b
add and update docs
peter-toth Jun 2, 2021
41c0f0a
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Jun 20, 2021
d10a8be
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Jun 23, 2021
db34640
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Jun 28, 2021
d081885
Clean up code, add more comments
peter-toth Jun 29, 2021
e98754a
temp
peter-toth Jun 29, 2021
bb623cf
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Sep 14, 2021
ae1d84e
fix messages
peter-toth Sep 16, 2021
2eb14f1
regenerate expected plans
peter-toth Sep 16, 2021
060e4b7
add more comments
peter-toth Sep 20, 2021
d86d2c4
use Header alias type
peter-toth Sep 20, 2021
0a97c8b
Merge commit '6e8a4626117f0cb5535875f7181f56350ad4f195' into SPARK-34…
peter-toth Oct 5, 2021
61f2b34
Merge commit '8ae88d01b46d581367d0047b50fcfb65078ab972' into SPARK-34…
peter-toth Nov 9, 2021
532d05e
remove dependecy on `spark.sql.execution.reuseSubquery`, remove unnec…
peter-toth Nov 16, 2021
c488377
refactor
attilapiros Nov 16, 2021
e0a7610
accept Attila's suggestion but keep the `merged` flag, minor name cha…
peter-toth Nov 17, 2021
63c3709
fix review findings
peter-toth Nov 17, 2021
dabbea4
add negative test case to general join matching where only a non-chil…
peter-toth Nov 17, 2021
4d97de5
amend a test to cover extra projects on both sides
peter-toth Nov 17, 2021
cc8690e
improve generic node merging
peter-toth Nov 17, 2021
83c78ca
minor fix
peter-toth Nov 24, 2021
3130913
fix merging logic if merging a plan into a merged plan
peter-toth Nov 28, 2021
3e8f7fa
remove unused mapAttributes()
peter-toth Nov 28, 2021
252c9b1
do not merge nondeterministic plans
peter-toth Dec 6, 2021
fa5e786
fix test and check adaptive path as well
peter-toth Dec 6, 2021
e292732
check test results
peter-toth Dec 6, 2021
963c423
check for same instance of subqueries
peter-toth Dec 6, 2021
9efaf2a
use the new `isCorrelated()`
peter-toth Dec 10, 2021
96a502d
move deterministic check as early as possible
peter-toth Dec 10, 2021
5b91d61
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 2, 2022
8bcf515
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 16, 2022
87ba289
fix comments
peter-toth Mar 18, 2022
6d5a124
rephrase general node merging
peter-toth Mar 18, 2022
851ca29
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 24, 2022
96d0cab
use CTE nodes
peter-toth Mar 23, 2022
a57ed32
no need for extra shuffle with subquery `CTERelationDef`s
peter-toth Mar 24, 2022
0b34d83
regenerate expected plan stability output
peter-toth Mar 24, 2022
4985d43
remove obsolete assert
peter-toth Mar 24, 2022
de9b312
fix LogicalPlanTagInSparkPlanSuite, for logical scan plan trees consi…
peter-toth Mar 24, 2022
13a2fad
fix row-level runtime filtering as after subquery merging bloom filte…
peter-toth Mar 24, 2022
92ce6e5
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Mar 29, 2022
67ffae6
fix header scaladoc
peter-toth Apr 6, 2022
a32a85c
rename subquery flag to mergedScalarSubquery, fix CTERelationRef scal…
peter-toth Apr 6, 2022
1bc8a45
fix test name
peter-toth Apr 6, 2022
224edef
add new testcase "Merge non-correlated scalar subqueries in a subquery"
peter-toth Apr 6, 2022
a7fd1c5
add test "Merge non-correlated scalar subqueries with conflicting names"
peter-toth Apr 6, 2022
a5eb5df
add test "Merging subqueries from different places"
peter-toth Apr 6, 2022
8457148
add test "Do not merge subqueries with different join conditions", fi…
peter-toth Apr 6, 2022
1ff64e4
add test "Do not merge subqueries with different filter conditions"
peter-toth Apr 6, 2022
13a1cdb
simplify do not merge test cases
peter-toth Apr 6, 2022
4da3fe6
drop general node merging code path
peter-toth Apr 6, 2022
96ed6fd
use canonicalized form to in Filter and Join condition comparison
peter-toth Apr 7, 2022
dbe81e2
simplify aggregate check
peter-toth Apr 18, 2022
ba299d5
fix aggregate grouping compare
peter-toth Apr 18, 2022
65f3425
simplify header
peter-toth Apr 18, 2022
dc5e9b9
Merge branch 'master' into SPARK-34079-multi-column-scalar-subquery
peter-toth Apr 19, 2022
c64373b
rebase on top of https://github.com/apache/spark/pull/34929
peter-toth Apr 19, 2022
3993eab
revert regenerated q5 expected output
peter-toth Apr 19, 2022
f93283d
fix nested subqueries, add test
peter-toth Apr 19, 2022
8c5c9ac
fix comment
peter-toth Apr 19, 2022
3b7ad2c
rename method
peter-toth Apr 19, 2022
c268580
fix test name
peter-toth Apr 19, 2022
169fd6b
fix removeReferences
peter-toth Apr 19, 2022
19128ff
rename merged subquery flag in cte def
peter-toth Apr 20, 2022
1c4d14b
simplify removeReferences, fix tests
peter-toth Apr 20, 2022
2590edf
fix scala 2.13
peter-toth Apr 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ case class BloomFilterMightContain(
case e : Expression if e.foldable => TypeCheckResult.TypeCheckSuccess
case subquery : PlanExpression[_] if !subquery.containsPattern(OUTER_REFERENCE) =>
TypeCheckResult.TypeCheckSuccess
case GetStructField(subquery: PlanExpression[_], _, _)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because with this PR some bloom filter aggregate subqueries can be merged. E.g.

Join Inner, ((c1#45920 = c2#45926) AND (b1#45919 = b2#45925))
:- Filter might_contain(scalar-subquery#45986 [], xxhash64(b1#45919, 42))
:  :  +- Aggregate [bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0) AS bloomFilter#45985]
:  :     +- Filter ((isnotnull(a2#45924) AND (a2#45924 = 62)) AND (isnotnull(c2#45926) AND isnotnull(b2#45925)))
:  :        +- Relation default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] parquet
:  +- Filter might_contain(scalar-subquery#45983 [], xxhash64(c1#45920, 42))
:     :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0) AS bloomFilter#45982]
:     :     +- Filter ((isnotnull(a2#45924) AND (a2#45924 = 62)) AND (isnotnull(c2#45926) AND isnotnull(b2#45925)))
:     :        +- Relation default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] parquet
:     +- Filter (isnotnull(c1#45920) AND isnotnull(b1#45919))
:        +- Relation default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] parquet
+- Filter ((isnotnull(a2#45924) AND (a2#45924 = 62)) AND (isnotnull(c2#45926) AND isnotnull(b2#45925)))
   +- Relation default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] parquet

=>

WithCTE
:- CTERelationDef 0
:  +- Project [named_struct(bloomFilter, bloomFilter#45985, bloomFilter, bloomFilter#45982) AS mergedValue#45989]
:     +- Aggregate [bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0) AS bloomFilter#45985, bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0) AS bloomFilter#45982]
:        +- Filter ((isnotnull(a2#45924) AND (a2#45924 = 62)) AND (isnotnull(c2#45926) AND isnotnull(b2#45925)))
:           +- Relation default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] parquet
+- Join Inner, ((c1#45920 = c2#45926) AND (b1#45919 = b2#45925))
   :- Filter might_contain(scalar-subquery#45986 [].bloomFilter, xxhash64(b1#45919, 42))
   :  :  +- CTERelationRef 0, true, [mergedValue#45989], true
   :  +- Filter might_contain(scalar-subquery#45983 [].bloomFilter, xxhash64(c1#45920, 42))
   :     :  +- CTERelationRef 0, true, [mergedValue#45989], true
   :     +- Filter (isnotnull(c1#45920) AND isnotnull(b1#45919))
   :        +- Relation default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] parquet
   +- Filter ((isnotnull(a2#45924) AND (a2#45924 = 62)) AND (isnotnull(c2#45926) AND isnotnull(b2#45925)))
      +- Relation default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] parquet

if !subquery.containsPattern(OUTER_REFERENCE) =>
TypeCheckResult.TypeCheckSuccess
case _ =>
TypeCheckResult.TypeCheckFailure(s"The Bloom filter binary input to $prettyName " +
"should be either a constant value or a scalar subquery expression")
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, TypedImperativeAggregate}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
Expand Down Expand Up @@ -671,23 +671,27 @@ case class CTERelationDef(child: LogicalPlan, id: Long = CTERelationDef.newId) e
}

object CTERelationDef {
private val curId = new java.util.concurrent.atomic.AtomicLong()
private[sql] val curId = new java.util.concurrent.atomic.AtomicLong()
def newId: Long = curId.getAndIncrement()
}

/**
* Represents the relation of a CTE reference.
* @param cteId The ID of the corresponding CTE definition.
* @param _resolved Whether this reference is resolved.
* @param output The output attributes of this CTE reference, which can be different from
* the output of its corresponding CTE definition after attribute de-duplication.
* @param statsOpt The optional statistics inferred from the corresponding CTE definition.
* @param cteId The ID of the corresponding CTE definition.
* @param _resolved Whether this reference is resolved.
* @param output The output attributes of this CTE reference, which can be different
* from the output of its corresponding CTE definition after attribute
* de-duplication.
* @param statsOpt The optional statistics inferred from the corresponding CTE
* definition.
* @param mergedScalarSubquery If this reference points to a merged scalar subquery.
*/
case class CTERelationRef(
cteId: Long,
_resolved: Boolean,
override val output: Seq[Attribute],
statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {
statsOpt: Option[Statistics] = None,
mergedScalarSubquery: Boolean = false) extends LeafNode with MultiInstanceRelation {

final override val nodePatterns: Seq[TreePattern] = Seq(CTE)

Expand Down Expand Up @@ -1007,6 +1011,24 @@ case class Aggregate(
}
}

object Aggregate {
def supportsAggregationBufferSchema(schema: StructType): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function name looks confusing. How about isAggregateBufferMutable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we simply inline this method as it's very short anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in 3b7ad2c

schema.forall(f => UnsafeRow.isMutable(f.dataType))
}

def supportsHashAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean = {
val aggregationBufferSchema = StructType.fromAttributes(aggregateBufferAttributes)
supportsAggregationBufferSchema(aggregationBufferSchema)
}

def supportsObjectHashAggregate(aggregateExpressions: Seq[AggregateExpression]): Boolean = {
aggregateExpressions.map(_.aggregateFunction).exists {
case _: TypedImperativeAggregate[_] => true
case _ => false
}
}
}

case class Window(
windowExpressions: Seq[NamedExpression],
partitionSpec: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object TreePattern extends Enumeration {
val REGEXP_REPLACE: Value = Value
val RUNTIME_REPLACEABLE: Value = Value
val SCALAR_SUBQUERY: Value = Value
val SCALAR_SUBQUERY_REFERENCE: Value = Value
val SCALA_UDF: Value = Value
val SORT: Value = Value
val SUBQUERY_ALIAS: Value = Value
Expand Down
Loading