Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Nov 21, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

cloud-fan and others added 4 commits November 21, 2022 13:31
…caching

### What changes were proposed in this pull request?

Today, Spark is very conservative and uses the analyzed plan instead of the optimized plan as the cache key. Many cache opportunities are missed.

This PR updates `SparkSessionExtensions` to allow people to inject custom plan normalization rules. Users can pick some safe optimizer rules, or implement new rules based on their business needs, to do plan normalization and increase the cache hit rate.

### Why are the changes needed?

allow advanced users to do caching better.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #38692 from cloud-fan/cache.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…huffle chunk is zero-size

### What changes were proposed in this pull request?
When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks.

### Why are the changes needed?
When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT

Closes #38333 from gaoyajun02/SPARK-40872.

Authored-by: gaoyajun02 <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
### What changes were proposed in this pull request?

Upgrade ZooKeeper to 3.6.3

### Why are the changes needed?

ZooKeeper 3.6.3 contains many bugfixes, such as a thread leak issue described in ZOOKEEPER-3706.

FYI, https://zookeeper.apache.org/doc/r3.6.3/releasenotes.html

- Why is 3.6.3 but not higher?
  - #37507
  - #32572

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests and dependency check

Closes #38733 from yaooqinn/SPARK-41211.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
…ravel spec

### What changes were proposed in this pull request?

Add TimeTravelSpec to the key of relation cache in AnalysisContext.

### Why are the changes needed?

Correct the relation resolution for the same table but different TimeTravelSpec.

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

add test

Closes #38687 from ulysses-you/time-travel-spec.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

Today, our SQL parser only supports PIVOT/UNPIVOT at the end of the FROM clause. This is quite limited and it's better to allow PIVOT/UNPIVOT in the join children as well. As a reference, snowflake supports it: https://docs.snowflake.com/en/sql-reference/constructs/from.html

This PR makes PIVOT/UNPIVOT the same level as JOIN. Wherever you can use JOIN to extend a relation, you can also use PIVOT/UNPIVOT. Many SQL syntaxes are supported after this PR
```
FROM t1 PIVOT/UNPIVOT ... JOIN t2  // pivot/unpivot the left table
FROM t1 JOIN t2 PIVOT/UNPIVOT ...  // pivot/unpivot the join result. This is the same before this PR
FROM t1 JOIN (t2 PIVOT/UNPIVOT ...)  // pivot/unpivot the right table
FROM t1 PIVOT/UNPIVOT ... PIVOT/UNPIVOT // nested pivot/unpivot
```

### Why are the changes needed?

make PIVOT/UNPIVOT syntax more flexible

### Does this PR introduce _any_ user-facing change?

Yes, new SQL syntax without any breaking change

### How was this patch tested?

new parser tests

Closes #38713 from cloud-fan/pivot.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
@github-actions github-actions bot added the DOCS label Nov 21, 2022
…ollect` in Arrow code path

### What changes were proposed in this pull request?
1, Remove JSON code path;
2, use RDD.collect in Arrow code path, since existing tests were already broken in Arrow code path;
3, reenable `test_fill_na`

### Why are the changes needed?
existing Arrow code path is still problematic and it fails and fallback to JSON code path, which change the output datatypes of `test_fill_na`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
reenabled test and added UT

Closes #38706 from zhengruifeng/collect_disable_json.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Yaohua628 and others added 2 commits November 22, 2022 09:54
…ue consistent

### What changes were proposed in this pull request?
In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)).

This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.

### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT

Closes #38683 from Yaohua628/spark-41151.

Authored-by: yaohua <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
### What changes were proposed in this pull request?
pin `protobuf==3.19.4` in tests

### Why are the changes needed?
versions were already changed in #38693

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
updated CI

Closes #38729 from zhengruifeng/connect_infra_protobuf.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
@github-actions github-actions bot added the INFRA label Nov 22, 2022
…nondeterministic predicates

### What changes were proposed in this pull request?

This PR fixes a regression caused by #38511 . For `FROM t WHERE rand() > 0.5 AND col = 1`, we can still push down `col = 1` because we don't guarantee the predicates evaluation order within a `Filter`.

This PR updates `ScanOperation` to consider this case and bring back the previous pushdown behavior.

### Why are the changes needed?

fix perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #38746 from cloud-fan/filter.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@pull pull bot merged commit 0cdbda1 into huangxiaopingRD:master Nov 22, 2022
pull bot pushed a commit that referenced this pull request Nov 1, 2025
### What changes were proposed in this pull request?

This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52).

### Why are the changes needed?

Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified.

Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation.

For a query
```sql
select (select max(i) from df) as max_i, (select min(i) from df) as min_i
```

Before introducing the canonicalization, the plan is
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- Project [i#0]
:  :     +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- Project [i#10]
:        +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
      :  :- Subquery subquery#2, [id=#32]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- Subquery subquery#4, [id=#33]
      :     +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                        +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                           +- *(1) Project [i#10]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
                  +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                     +- Project [i#10]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
   :  :- Subquery subquery#2, [id=#32]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                     +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                        +- *(1) Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
               +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                  +- Project [i#0]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#33]
   :     +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                     +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                        +- *(1) Project [i#10]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
               +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                  +- Project [i#10]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery**
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5]
:  :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:  :  +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:  :     +- Project [i#0]
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:     +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:        +- Project [i#0]
:           +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5]
      :  :- Subquery subquery#2, [id=#40]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
                  +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                     +- ShuffleQueryStage 0
                        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                           +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                              +- *(1) Project [i#0]
                                 +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                     +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                        +- Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- ReusedSubquery Subquery subquery#2, [id=#40]
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5]
   :  :- Subquery subquery#2, [id=#40]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
            +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#41]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
   :           +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
   :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37]
   :                 +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
   :                    +- Project [i#0]
   :                       +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52529 from yhuang-db/scan-canonicalization.

Authored-by: yhuang-db <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
huangxiaopingRD pushed a commit that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52).

### Why are the changes needed?

Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified.

Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation.

For a query
```sql
select (select max(i) from df) as max_i, (select min(i) from df) as min_i
```

Before introducing the canonicalization, the plan is
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- Project [i#0]
:  :     +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- Project [i#10]
:        +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
      :  :- Subquery subquery#2, [id=#32]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- Subquery subquery#4, [id=#33]
      :     +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                        +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                           +- *(1) Project [i#10]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
                  +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                     +- Project [i#10]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
   :  :- Subquery subquery#2, [id=#32]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                     +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                        +- *(1) Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
               +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                  +- Project [i#0]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#33]
   :     +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                     +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                        +- *(1) Project [i#10]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
               +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                  +- Project [i#10]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery**
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5]
:  :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:  :  +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:  :     +- Project [i#0]
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:     +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:        +- Project [i#0]
:           +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5]
      :  :- Subquery subquery#2, [id=#40]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
                  +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                     +- ShuffleQueryStage 0
                        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                           +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                              +- *(1) Project [i#0]
                                 +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                     +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                        +- Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- ReusedSubquery Subquery subquery#2, [id=#40]
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5]
   :  :- Subquery subquery#2, [id=#40]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
            +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#41]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
   :           +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
   :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37]
   :                 +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
   :                    +- Project [i#0]
   :                       +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52529 from yhuang-db/scan-canonicalization.

Authored-by: yhuang-db <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants