Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

On Spark Connect, df.col("*") should be resolved against the target plan

Why are the changes needed?

In [6]: df1 = spark.createDataFrame([{"id": 1}])

In [7]: df2 = spark.createDataFrame([{"id": 1, "val": "v"}])

In [8]: df1.join(df2)
Out[8]: DataFrame[id: bigint, id: bigint, val: string]

In [9]: df1.join(df2).select(df1["*"])
Out[9]: DataFrame[id: bigint, id: bigint, val: string]

it should be

In [3]: df1.join(df2).select(df1["*"])
Out[3]: DataFrame[id: bigint]

Does this PR introduce any user-facing change?

yes

How was this patch tested?

added ut

Was this patch authored or co-authored using generative AI tooling?

no

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't add an ambiguous detection for now, since in vanilla Spark,

In [7]: df1 = spark.createDataFrame([{"id": 1}])

In [8]: df1.join(df1)
Out[8]: DataFrame[id: bigint, id: bigint]

In [9]: df1.join(df1).select(df1["id"])
...
AnalysisException: Column id#0L are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. 

In [10]: df1.join(df1).select(df1["*"])
Out[10]: DataFrame[id: bigint]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably a bug in vanilla spark...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let me fail it in spark connect anyway

Copy link
Contributor Author

@zhengruifeng zhengruifeng Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this helper function due to the behavior difference between Dataset#col and functions#col

def col(colName: String): Column = colName match {
case "*" =>
Column(ResolvedStar(queryExecution.analyzed.output))
case _ =>
if (sparkSession.sessionState.conf.supportQuotedRegexColumnName) {
colRegex(colName)
} else {
Column(addDataFrameIdToCol(resolve(colName)))
}
}

def this(name: String) = this(withOrigin {
name match {
case "*" => UnresolvedStar(None)
case _ if name.endsWith(".*") =>
val parts = UnresolvedAttribute.parseAttributeName(name.substring(0, name.length - 2))
UnresolvedStar(Some(parts))
case _ => UnresolvedAttribute.quotedString(name)
}
})

@zhengruifeng zhengruifeng changed the title [SPARK-46677][SQL][CONNECT] Fix df.col("*") resolution [SPARK-46677][SQL][CONNECT] Fix dataframe["*"] resolution Jan 11, 2024
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO for myself, should revisit the implementation of colRegex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably skip it in spark connect. It's really a weird feature and non-standard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's off by default anyway, so we can throw a proper error if it's enabled in spark connect.

@zhengruifeng zhengruifeng force-pushed the py_df_star branch 4 times, most recently from 15487aa to 4ba519f Compare January 11, 2024 12:57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be a star. It's just a placeholder and will be replaced by ResolvedStar after finding the matching plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can handle it in ColumnResolutionHelper and reuse code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI run this test in both connect and vanilla

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI run this test only in Connect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between Connect and Classic for this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In [4]: cdf1 = spark.createDataFrame([Row(a=1, b=2, c=3)])

In [5]: cdf2 = spark.createDataFrame([Row(a=2, b=0)])

In [6]: cdf3 = cdf1.select(cdf1.a)

In [7]: cdf3.select(cdf1["*"]).schema
...
AnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT] Resolved attribute(s) "b", "c" missing from "a" in operator !Project [a#0L, b#1L, c#2L].  SQLSTATE: XX000;
!Project [a#0L, b#1L, c#2L]
+- Project [a#0L]
   +- LogicalRDD [a#0L, b#1L, c#2L], false


In [8]: cdf1.select(cdf2["*"]).schema
...
AnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "a", "b" missing from "a", "b", "c" in operator !Project [a#6L, b#7L]. Attribute(s) with the same name appear in the operation: "a", "b".
Please check if the right attribute(s) are used. SQLSTATE: XX000;
!Project [a#6L, b#7L]
+- LogicalRDD [a#0L, b#1L, c#2L], false


In [9]: cdf1.join(cdf1).select(cdf1["*"]).schema
Out[9]: StructType([StructField('a', LongType(), True), StructField('b', LongType(), True), StructField('c', LongType(), True)])

cdf1.join(cdf1).select(cdf1["*"]) won't fail due to AMBIGUOUS_COLUMN_REFERENCE

@HyukjinKwon
Copy link
Member

Merged to master.

@zhengruifeng zhengruifeng deleted the py_df_star branch January 16, 2024 00:35
zhengruifeng added a commit that referenced this pull request Jan 16, 2024
…)` on client side

### What changes were proposed in this pull request?
before #44689, `df["*"]` and `sf.col("*")` are both convert to `UnresolvedStar`, and then `Count(UnresolvedStar)` is converted to `Count(1)` in Analyzer:
https://github.com/apache/spark/blob/381f3691bd481abc8f621ca3f282e06db32bea31/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L1893-L1897

in that fix, we introduced a new node `UnresolvedDataFrameStar` for `df["*"]` which will be replaced to `ResolvedStar` later. Unfortunately, it doesn't match `Count(UnresolvedStar)` any more.
So it causes:
```
In [1]: from pyspark.sql import functions as sf

In [2]: df1 = spark.createDataFrame([{"id": 1, "val": "v"}])

In [3]: df1.select(sf.count(df1["*"]))
Out[3]: DataFrame[count(id, val): bigint]
```

which should be
```
In [3]: df1.select(sf.count(df1["*"]))
Out[3]: DataFrame[count(1): bigint]
```

In vanilla Spark, it is up to the `count` function to make such conversion `sf.count(df1["*"])` -> `sf.count(sf.lit(1))`, see

https://github.com/apache/spark/blob/e8dfcd3081abe16b2115bb2944a2b1cb547eca8e/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L422-L436

So it is a natural way to fix this behavior on the client side.

### Why are the changes needed?
to keep the behavior

### Does this PR introduce _any_ user-facing change?
it fix a behavior change introduced in #44689

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44752 from zhengruifeng/connect_fix_count_df_star.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
zhengruifeng added a commit that referenced this pull request Jan 16, 2024
### What changes were proposed in this pull request?
this PR is a followup of #44689, to fix `dataset.col("*")` in Scala Client

### Why are the changes needed?
fix `dataset.col("*")` resolution

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44748 from zhengruifeng/connect_scala_df_star.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants