-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40812][CONNECT] Add Deduplicate to Connect proto and DSL #38276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @cloud-fan |
...nnect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala
Outdated
Show resolved
Hide resolved
connector/connect/src/main/protobuf/spark/connect/relations.proto
Outdated
Show resolved
Hide resolved
|
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
12?
| Deduplicate deduplicate = 13; | |
| Deduplicate deduplicate = 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
12 is take by Sample tough that PR is not merged :) #38227
This line will cause merge conflict. Whether which PR is in first, it takes 12 and the other takes 13.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val connectPlan = { | |
| import org.apache.spark.sql.connect.dsl.plans._ | |
| Dataset.ofRows(spark, transform(connectTestRelation.distinct())) | |
| } | |
| val sparkPlan = sparkTestRelation.distinct() | |
| comparePlans(connectPlan.queryExecution.analyzed, sparkPlan.queryExecution.analyzed, false) | |
| val connectPlan2 = { | |
| import org.apache.spark.sql.connect.dsl.plans._ | |
| Dataset.ofRows(spark, transform(connectTestRelation.deduplicate(Seq("key", "value")))) | |
| } | |
| import org.apache.spark.sql.connect.dsl.plans._ | |
| val connectPlan = Dataset.ofRows(spark, transform(connectTestRelation.distinct())) | |
| val sparkPlan = sparkTestRelation.distinct() | |
| comparePlans(connectPlan.queryExecution.analyzed, sparkPlan.queryExecution.analyzed, false) | |
| val connectPlan2 = Dataset.ofRows(spark, transform(connectTestRelation.deduplicate(Seq("key", "value")))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was an issue here with the way that the two implicits of Spark and Spark Connect DSL are handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah here are some context that people may not know:
Scala seems to not allow two implicit defined in the same scope even though there is no ambiguity. In this case, Scala chooses to ignore one of the implementation. The workaround was to use sub-scope to limit one implicit (which is for connect) then its parent scope imports another implicit.
See comment here for the context:
Line 42 in 4201a59
| // TODO: Scala only allows one implicit per scope so we keep proto implicit imports in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to reuse SharedSparkSessionBase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is relevant to the comment above: the SharedSparkSession and its base defines implicit which will cause ambiguity with Catalyst implicit. For current SparkConnectProtoSuite, we cannot let it inherit SharedSparkSession. Meanwhile for the testing purpose on this Deduplicate implementation, we need a session. This is why this PR does some refactoring on the testing suites to have a separation.
connector/connect/src/main/protobuf/spark/connect/relations.proto
Outdated
Show resolved
Hide resolved
|
please rebase to enable the codegen check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to do name lookup. We can just create Deduplicate(allColumns, queryExecution.analyzed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the usage of is_all can't this be inferred by the list of columns? If they're identical is_all is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the question is if is_all is a nice convenience hack or strictly necessary, I'm ok with both :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a nice convenience that users do not want to know the previous schema of the LogicalPlan but just want to chain a DISTINCT. In this case users leave the backend to resolve the complete schema of the previous operation. Though users can val t = df.schema(); df.deduplicate(t)
Well from another perspective if we want to match existing DataFrame API (which is distinct()) this becomes a necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was an issue here with the way that the two implicits of Spark and Spark Connect DSL are handled.
|
can you fix the conflicts? |
ad0451a to
138bc4e
Compare
|
oh, conflicts again... |
|
@cloud-fan let me fix. It is very easy to have conflict on the auto generated python proto files. They are acting as "single point of failures". |
a25a351 to
8f0ad05
Compare
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think spark connect will have backward compatibility issues. It's a new API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Removed this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think that we probably shouldn't use catalyst DSL at all. The tests need Spark Connect planner, which needs SparkSession. Some tests happen to not invoke SparkSession, but it's a bit hacky to rely on this assumption. We should just compare plans produced by proto DSL and DataFrame APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We decided to with Catalyst DSL in #37994. However that also caused the pain of does the small scope to avoid implicit conflicts.
Given that we need a session based test, migrate all such tests to the same place that uses session and also DataFrame API just makes sense. Also I believe in this approach we don't have implicit conflict anymore in the same scope.
How about after this PR, let me send a follow up PR for the testing refactoring?
Or we can the have refactoring happen (I prefer a separate PR) then I can rebase this one. Either way is fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do it in a followup.
40b9c5b to
bf453b3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this is not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yes because I had to use DataFrame API for testing this ime.
Removed.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bf453b3 to
9110226
Compare
|
I will monitor the build job. Seeing more flaky than usual. |
|
thanks, merging to master! |
| if (rel.getAllColumnsAsKeys && rel.getColumnNamesCount > 0) { | ||
| throw InvalidPlanInput("Cannot deduplicate on both all columns and a subset of columns") | ||
| } | ||
| if (!rel.getAllColumnsAsKeys && rel.getColumnNamesCount == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need getAllColumnsAsKeys? Seems like we can just tell when the columns are not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and this is not matched with the logical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is, in Spark connect client, we only see column names, not expr IDs. If the DF has duplicated column names, then deduplicate by all columns can't work in Spark connect client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, column name is unknown either, as the input plan is unresolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, we don't need rel.getAllColumnsAsKeys condition because we can know that's the case when rel.getColumnNamesCount == 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, we don't need
rel.getAllColumnsAsKeyscondition because we can know that's the case whenrel.getColumnNamesCount == 0.
I want to clarify this specifically:
This is one of the Connect proto API design principle: we need to differentiate if a field is set or not set explicitly, or put it in another way, every intention should be expressed explicitly. Ultimately, this is to avoid ambiguity on the API surface.
One example is Project. If we see a Project without anything in the project list, then how do we interpret that? Does the user want to indicate a SELECT *? Does the user actually generate an invalid plan. The problem now is there are two possibilities for a plan, and the worse part is, one possibility is a valid plan, another is not. This led us explicitly encode SELECT * into the proto #38023.
So one of the reasons that we have a bool flag here is to not use rel.getColumnNamesCount == 0 to infer distinct on all columns which has caused ambiguity problem.
This might not be great because a few more fields could bring another problem: what if the user set them all. In terms of ambiguity, this is not an issue: we know that is an invalid plan without second choice.
…on client ### What changes were proposed in this pull request? Following up on #38276, this PR improve both `distinct()` and `dropDuplicates` DataFrame API in Python client, which both depends on `Deduplicate` plan in the Connect proto. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38327 from amaliujia/python_deduplicate. Authored-by: Rui Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This PR supports `Deduplicate` to Connect proto and DSL. Note that `Deduplicate` can not be replaced by SQL's `SELECT DISTINCT col_list`. The difference is that `Deduplicate` allows to remove duplicated rows based on a set of columns but returns all the columns. SQL's `SELECT DISTINCT col_list`, instead, can only return the `col_list`. ### Why are the changes needed? 1. To improve proto API coverage. 2. `Deduplicate` blocks apache#38166 because we want support `Union(isAll=false)` but that will return `Union().Distinct()` to match existing DataFrame API. `Deduplicate` is needed to write test cases for `Union(isAll=false)`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes apache#38276 from amaliujia/supportDropDuplicates. Authored-by: Rui Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…on client ### What changes were proposed in this pull request? Following up on apache#38276, this PR improve both `distinct()` and `dropDuplicates` DataFrame API in Python client, which both depends on `Deduplicate` plan in the Connect proto. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes apache#38327 from amaliujia/python_deduplicate. Authored-by: Rui Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR supports
Deduplicateto Connect proto and DSL.Note that
Deduplicatecan not be replaced by SQL'sSELECT DISTINCT col_list. The difference is thatDeduplicateallows to remove duplicated rows based on a set of columns but returns all the columns. SQL'sSELECT DISTINCT col_list, instead, can only return thecol_list.Why are the changes needed?
Deduplicateblocks [SPARK-40713][CONNECT] Improve SET operation support in the proto and the server #38166 because we want supportUnion(isAll=false)but that will returnUnion().Distinct()to match existing DataFrame API.Deduplicateis needed to write test cases forUnion(isAll=false).Does this PR introduce any user-facing change?
No
How was this patch tested?
UT