-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53593][SDP] Add response field for DefineDataset and DefineFlow RPC #52328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-53593][SDP] Add response field for DefineDataset and DefineFlow RPC #52328
Conversation
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
Outdated
Show resolved
Hide resolved
SCHJonathan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
Show resolved
Hide resolved
SCHJonathan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fix the PR description to follow the standard format?
See reference: #51590
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
| DefineDatasetResult define_dataset_result = 2; | ||
| DefineFlowResult define_flow_result = 3; | ||
| } | ||
| message CatalogIdentifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang @cloud-fan – thoughts on whether CatalogIdentifier is the right name and whether this is the right location for this message? Since this is a type that might end up useful elsewhere as well, I wonder if it would make more sense as a top-level message inside base.proto or catalog.proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it just be Identifier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identifier is pretty general. The Identifier class in Scala is scoped within the catalog package. If we had a similar package within the proto namespace, then an Identifier proto could make sense?
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
…roto Co-authored-by: Sandy Ryza <[email protected]>
…roto Co-authored-by: Sandy Ryza <[email protected]>
…se' into jessie.luo_data/spark-add-response
sryza
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left two more comments. After addressing those, this LGTM!
Thanks for bearing with the back and forth on this – protos can't be changed after they're released, so important to get them right the first time.
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sryza
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Going to wait a day before merging to give others a chance to look at the proto changes. cc @hvanhovell because Spark Connect proto changes
sql/connect/common/src/main/protobuf/spark/connect/common.proto
Outdated
Show resolved
Hide resolved
| val resolvedDataset = | ||
| defineDataset(cmd.getDefineDataset, sessionHolder) | ||
| val identifierBuilder = CatalogIdentifier.newBuilder() | ||
| resolvedDataset.resolvedCatalog.foreach(identifierBuilder.setCatalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we quote the names in CatalystIdentifier is because we need to combine them together as a single string for qualified name. But the protobuf message keeps catalog/schema/table name as separated fields, why do we quote the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We store the raw names in each field of the trait CatalystIdentifier, and we should do the same for the corresponding protobug message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a misunderstanding here. Looking at the code
def quoteIdentifier(name: String): String = name.replace("`", "``")
...
def resolvedCatalog: Option[String] = catalog.map(quoteIdentifier)
This is not the raw name, and we will see the difference if the catalog name contains backticks.
This is an obvious bug. We did so in CatalogIdentifier because we use it in s"`${resolvedCatalog.get}`.`${resolvedDb.get}`.`$resolvedId`". It does not make sense when we set the protobuf fields.
cc @sryza
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe @cloud-fan is right. @cookiedough77 – are you up for creating a followup that addresses this? I think we can revert the changes inside identifiers.scala and populate the values of the proto with the existing identifier / database / catalog properties of CatalystIdentifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, working on this, thanks for pointing it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix PR: #52483
|
Going to merge now that @cloud-fan 's feedback has been addressed |
…th `4.1.0-preview3` RC1 ### What changes were proposed in this pull request? This PR aims to update Spark Connect-generated Swift source code with Apache Spark `4.1.0-preview3` RC1. ### Why are the changes needed? There are many changes between Apache Spark 4.1.0-preview2 and preview3. - apache/spark#52685 - apache/spark#52613 - apache/spark#52553 - apache/spark#52532 - apache/spark#52517 - apache/spark#52514 - apache/spark#52487 - apache/spark#52328 - apache/spark#52200 - apache/spark#52154 - apache/spark#51344 To use the latest bug fixes and new messages to develop for new features of `4.1.0-preview3`. ``` $ git clone -b v4.1.0-preview3 https://github.com/apache/spark.git $ cd spark/sql/connect/common/src/main/protobuf/ $ protoc --swift_out=. spark/connect/*.proto $ protoc --grpc-swift_out=. spark/connect/*.proto // Remove empty GRPC files $ cd spark/connect $ grep 'This file contained no services' * | awk -F: '{print $1}' | xargs rm ``` ### Does this PR introduce _any_ user-facing change? Pass the CIs. ### How was this patch tested? Pass the CIs. I manually tested with `Apache Spark 4.1.0-preview3` (with the two SDP ignored tests). ``` $ swift test --no-parallel ... ✔ Test run with 203 tests in 21 suites passed after 19.088 seconds. ``` ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #252 from dongjoon-hyun/SPARK-54043. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…w RPC ### What changes were proposed in this pull request? This PR updates the Spark Connect server to return resolved dataset and flow names in the responses of DefineDataset and DefineFlow RPCs. Changes include: 1. Adding resolved_data_name and resolved_flow_name to the respective proto response messages. 2. Updating the RPC handlers to return resolved identifiers as response. 3. Adding unit tests in SparkDeclarativePipelinesServerSuite to validate the resolved names ### Why are the changes needed? The SC client requires the resolved names for datasets and flows to support graph resolution in the LDP frontend. Returning this info from the server ensures consistent naming and proper registration. ### Does this PR introduce _any_ user-facing change? Yes. The DefineDataset and DefineFlow RPC responses now include fully qualified names like `catalog`.`db`.`mv`. Implicit flows to temp views return unqualified names like `mv`. ### How was this patch tested? Added targeted unit tests in SparkDeclarativePipelinesServerSuite. Verified both default and custom catalog/database cases. Run test using ``` ./build/sbt > project connect > testOnly *SparkDeclarativePipelinesServerSuite ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52328 from cookiedough77/jessie.luo_data/spark-add-response. Lead-authored-by: Jessie Luo <[email protected]> Co-authored-by: cookiedough77 <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>

What changes were proposed in this pull request?
This PR updates the Spark Connect server to return resolved dataset and flow names in the responses of DefineDataset and DefineFlow RPCs.
Changes include:
Why are the changes needed?
The SC client requires the resolved names for datasets and flows to support graph resolution in the LDP frontend. Returning this info from the server ensures consistent naming and proper registration.
Does this PR introduce any user-facing change?
Yes. The DefineDataset and DefineFlow RPC responses now include fully qualified names like
catalog.db.mv. Implicit flows to temp views return unqualified names likemv.How was this patch tested?
Added targeted unit tests in SparkDeclarativePipelinesServerSuite. Verified both default and custom catalog/database cases.
Run test using
Was this patch authored or co-authored using generative AI tooling?
No.