Skip to content

Conversation

@MaxGekk
Copy link
Owner

@MaxGekk MaxGekk commented Jul 2, 2018

What changes were proposed in this pull request?

In the PR, I propose column-based API for the pivot() function. It allows using of nested columns as the pivot column. Also this makes it consistent with how groupBy() works.

How was this patch tested?

I added new tests to DataFramePivotSuite and updated PySpark examples for the pivot() function.

@MaxGekk MaxGekk changed the title [SQL] pivot() with Column type argument [SQL] Support the pivot column of the Column type Jul 2, 2018
@MaxGekk MaxGekk changed the title [SQL] Support the pivot column of the Column type [SPARK-24722][SQL] pivot() with Column type argument Jul 2, 2018
Copy link

@rednaxelafx rednaxelafx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mostly looks good, but I'd like to ask a few things first:

  1. The new overloaded pivot() that takes Column only exist for pivot(Column, Seq[Any]). Were you planning to add a new overload for each existing String version, e.g. pivot(Column) and pivot(Column, java.util.List[Any])?
  2. Since you're adding the Column version(s) to address accessing nested columns, would it be nice to highlight that capability in the doc example? (Yes you've already included that in test case examples so that's already good)

*
* // Or without specifying column values (less efficient)
* df.groupBy("year").pivot("course").sum("earnings")
* df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight improvement: would you like to highlight in the doc example that the Column API gives you nested column access easily?

@MaxGekk
Copy link
Owner Author

MaxGekk commented Jul 3, 2018

@rednaxelafx This PR is a draft agains the master of my forked repo. Could you add the same comments to this PR: apache#21699 . Sorry that I confused you.

@MaxGekk MaxGekk closed this Jul 3, 2018
* @since 2.4.0
*/
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = {
def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a Column version API for pivot() signature with no "values" as well.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df,
groupingExprs,
RelationalGroupedDataset.PivotType(df.resolve(pivotColumn), values.map(Literal.apply)))
RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(Literal.apply)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we allow an arbitrary Expression as the pivot column, we might wanna accept both an literal value object or an Expression as the pivot value.
Another thing we need to do is to check groupbyExprs and pivotColumn do not share any column reference, i.e., a column ref cannot appear in both groupByExprs and pivotColumn.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might wanna accept both an literal value object or an Expression as the pivot value.

Are you sure about arbitrary expression for pivot values. Looking at the implementation, we still expect literals. I am not sure that we should give users broader choice in API.

class DataFramePivotSuite extends QueryTest with SharedSQLContext {
import testImplicits._

test("pivot courses") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more tests here for Pivot column as:

  1. a constant value: Although this might not cause any correctness issue, I suggest we add a check for this in our pivot method and throw an exception.
  2. an expression concerning two or more column references (we don't need to cover column list here coz it's been covered elsewhere, so think about other operators).
  3. an aggregate expression: verify that we throw the right exception.

For 1 and 3, since we only support one or multiple column references as the Pivot column before this change, Pivot node processing in Analyzer does not perform these checks. Now we need to.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk MaxGekk deleted the pivot-column-draft branch August 17, 2019 13:34
MaxGekk pushed a commit that referenced this pull request Apr 18, 2020
…types

### What changes were proposed in this pull request?

This PR intends to fix a bug that occurs when comparing null types to decimal types in master/branch-3.0;
```
scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and null).; line 1 pos 0;
'Project [(v1#5 = null) AS (v1 = NULL)#7]
+- Project [value#2 AS v1#5]
   +- LocalRelation [value#2]
...
```
The query above passed in v2.4.5.

### Why are the changes needed?

bugfix

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes apache#28241 from maropu/SPARK-31468.

Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
MaxGekk pushed a commit that referenced this pull request Apr 24, 2020
…types

### What changes were proposed in this pull request?

This PR intends to fix a bug that occurs when comparing null types to decimal types in master/branch-3.0;
```
scala> Seq(BigDecimal(10)).toDF("v1").selectExpr("v1 = NULL").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '(`v1` = NULL)' due to data type mismatch: differing types in '(`v1` = NULL)' (decimal(38,18) and null).; line 1 pos 0;
'Project [(v1#5 = null) AS (v1 = NULL)#7]
+- Project [value#2 AS v1#5]
   +- LocalRelation [value#2]
...
```
The query above passed in v2.4.5.

### Why are the changes needed?

bugfix

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests.

Closes apache#28241 from maropu/SPARK-31468.

Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a7fb330)
Signed-off-by: Wenchen Fan <[email protected]>
MaxGekk pushed a commit that referenced this pull request Feb 26, 2024
…n properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#45214 from zhengruifeng/connect_fix_read_join.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
MaxGekk pushed a commit that referenced this pull request Jul 22, 2025
…ingBuilder`

### What changes were proposed in this pull request?

This PR aims to improve `toString` by `JEP-280` instead of `ToStringBuilder`. In addition, `Scalastyle` and `Checkstyle` rules are added to prevent a future regression.

### Why are the changes needed?

Since Java 9, `String Concatenation` has been handled better by default.

| ID | DESCRIPTION |
| - | - |
| JEP-280 | [Indify String Concatenation](https://openjdk.org/jeps/280) |

For example, this PR improves `OpenBlocks` like the following. Both Java source code and byte code are simplified a lot by utilizing JEP-280 properly.

**CODE CHANGE**
```java

- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-   .append("appId", appId)
-   .append("execId", execId)
-   .append("blockIds", Arrays.toString(blockIds))
-   .toString();
+ return "OpenBlocks[appId=" + appId + ",execId=" + execId + ",blockIds=" +
+     Arrays.toString(blockIds) + "]";
```

**BEFORE**
```
  public java.lang.String toString();
    Code:
       0: new           apache#39                 // class org/apache/commons/lang3/builder/ToStringBuilder
       3: dup
       4: aload_0
       5: getstatic     apache#41                 // Field org/apache/commons/lang3/builder/ToStringStyle.SHORT_PREFIX_STYLE:Lorg/apache/commons/lang3/builder/ToStringStyle;
       8: invokespecial apache#47                 // Method org/apache/commons/lang3/builder/ToStringBuilder."<init>":(Ljava/lang/Object;Lorg/apache/commons/lang3/builder/ToStringStyle;)V
      11: ldc           apache#50                 // String appId
      13: aload_0
      14: getfield      #7                  // Field appId:Ljava/lang/String;
      17: invokevirtual apache#51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      20: ldc           apache#55                 // String execId
      22: aload_0
      23: getfield      #13                 // Field execId:Ljava/lang/String;
      26: invokevirtual apache#51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      29: ldc           apache#56                 // String blockIds
      31: aload_0
      32: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      35: invokestatic  apache#57                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      38: invokevirtual apache#51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      41: invokevirtual apache#61                 // Method org/apache/commons/lang3/builder/ToStringBuilder.toString:()Ljava/lang/String;
      44: areturn
```

**AFTER**
```
  public java.lang.String toString();
    Code:
       0: aload_0
       1: getfield      #7                  // Field appId:Ljava/lang/String;
       4: aload_0
       5: getfield      #13                 // Field execId:Ljava/lang/String;
       8: aload_0
       9: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      12: invokestatic  apache#39                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      15: invokedynamic apache#43,  0             // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;
      20: areturn
```

### Does this PR introduce _any_ user-facing change?

No. This is an `toString` implementation improvement.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51572 from dongjoon-hyun/SPARK-52880.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
MaxGekk pushed a commit that referenced this pull request Sep 1, 2025
…onicalized expressions

### What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and  aggregate expressions. This affects pyspark udfs in particular. Example:

```
from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```

Currently results in a plan like this:

```
Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))
```

and then it throws:

```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```

- how canonicalized fixes this:
  -  nondeterministic PythonUDF expressions always have distinct resultIds per udf
  - The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

### Why are the changes needed?

- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

### Does this PR introduce _any_ user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

### How was this patch tested?

- added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.

Authored-by: Ben Hurdelhey <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
MaxGekk pushed a commit that referenced this pull request Nov 12, 2025
### What changes were proposed in this pull request?

This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52).

### Why are the changes needed?

Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified.

Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation.

For a query
```sql
select (select max(i) from df) as max_i, (select min(i) from df) as min_i
```

Before introducing the canonicalization, the plan is
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- Project [i#0]
:  :     +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- Project [i#10]
:        +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=apache#32] AS max_i#3, Subquery subquery#4, [id=apache#33] AS min_i#5]
      :  :- Subquery subquery#2, [id=apache#32]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- Subquery subquery#4, [id=apache#33]
      :     +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                        +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                           +- *(1) Project [i#10]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
                  +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                     +- Project [i#10]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=apache#32] AS max_i#3, Subquery subquery#4, [id=apache#33] AS min_i#5]
   :  :- Subquery subquery#2, [id=apache#32]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                     +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                        +- *(1) Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
               +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                  +- Project [i#0]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=apache#33]
   :     +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                     +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                        +- *(1) Project [i#10]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
               +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                  +- Project [i#10]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery**
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5]
:  :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:  :  +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:  :     +- Project [i#0]
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:     +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:        +- Project [i#0]
:           +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=apache#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=apache#40].min(i) AS min_i#5]
      :  :- Subquery subquery#2, [id=apache#40]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
                  +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                     +- ShuffleQueryStage 0
                        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                           +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                              +- *(1) Project [i#0]
                                 +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                     +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                        +- Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- ReusedSubquery Subquery subquery#2, [id=apache#40]
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=apache#40].max(i) AS max_i#3, Subquery subquery#4, [id=apache#41].min(i) AS min_i#5]
   :  :- Subquery subquery#2, [id=apache#40]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
            +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=apache#41]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
   :           +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
   :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37]
   :                 +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
   :                    +- Project [i#0]
   :                       +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52529 from yhuang-db/scan-canonicalization.

Authored-by: yhuang-db <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants