-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40737][CONNECT] Add basic support for DataFrameWriter #38192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
connector/connect/src/main/protobuf/spark/connect/commands.proto
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit weird to have this in the SparkPlanner node, but I guess this is the consequence of the builder() API we have in the DataFrameWriter.
@cloud-fan AFAIK you have been working on making writes more declarative (i.e. planned writes). Do you see a way to improve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @allisonwang-db FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more than planned write. We need to create a logical plan for DF write, instead of putting implementation code in DF write APIs.
...ct/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
Outdated
Show resolved
Hide resolved
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
Outdated
Show resolved
Hide resolved
76aea0e to
d67edb8
Compare
|
@cloud-fan @amaliujia @hvanhovell please take a look! |
d67edb8 to
186f9ba
Compare
...connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use IllegalArgumentException here? Or do you feel this needs its own specific exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to have a custom exception for when we rethrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a user-facing error, we should actually leverage errorframe work we have .. cc @gengliangwang @MaxGekk @itholic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to fix this as a follow up, does it make sense?
The errors are reported back through grpc. If you point me to the right base class I can fix it then.
186f9ba to
c4ae79e
Compare
...connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
Outdated
Show resolved
Hide resolved
connector/connect/src/main/protobuf/spark/connect/commands.proto
Outdated
Show resolved
Hide resolved
|
Can one of the admins verify this patch? |
...connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
Outdated
Show resolved
Hide resolved
|
@hvanhovell @cloud-fan @HyukjinKwon can you please have a look? |
...connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
Outdated
Show resolved
Hide resolved
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
Show resolved
Hide resolved
...connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
Outdated
Show resolved
Hide resolved
...ct/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
Outdated
Show resolved
Hide resolved
...ct/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
Outdated
Show resolved
Hide resolved
connector/connect/src/main/protobuf/spark/connect/commands.proto
Outdated
Show resolved
Hide resolved
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise from my end.
...r/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
Outdated
Show resolved
Hide resolved
...ct/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
Outdated
Show resolved
Hide resolved
...ct/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
Outdated
Show resolved
Hide resolved
...ct/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala
Outdated
Show resolved
Hide resolved
connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
Show resolved
Hide resolved
|
Merging this one. |
| Relation input = 1; | ||
| // Format value according to the Spark documentation. Examples are: text, parquet, delta. | ||
| string source = 2; | ||
| // The destination of the write operation must be either a path or a table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in DF API, people can do df.write.format("jdbc").option("table", ...).save() , so the destination is neither path nor table. I think an optional table name is sufficient. If table name is not given, the destination will be figured out from write options (path is just one write option).
| string path = 3; | ||
| string table_name = 4; | ||
| } | ||
| SaveMode mode = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added DataFrameWriterV2 because we believe SaveMode is a bad design. It's confusing if we write to a table, as there are so many options: create if not exists, create or replace, replace if exists, append if exists, overwrite data if exists, etc.
Anyway, we need to support save mode in the proto definition to support the existing DF API. If we want to support DataFrameWriterV2 in Spark connect client, we should probably have a new proto definition without save mode.
| } | ||
| SaveMode mode = 5; | ||
| // List of columns to sort the output by. | ||
| repeated string sort_column_names = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be part of the BucketBy
### What changes were proposed in this pull request? This change adds basic support for writes through the Spark Connect API. In todays implementation of the write API from a DataFrame perspective the interface of the DataFrameWriter is as declarative as possible today. The write support is implemented as a `Command` and does not return anything. ### Why are the changes needed? Write support through Spark Connect. ### Does this PR introduce _any_ user-facing change? Experimental API ### How was this patch tested? Added new unit tests for the behavior. Closes apache#38192 from grundprinzip/spark-40737. Authored-by: Martin Grund <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
What changes were proposed in this pull request?
This change adds basic support for writes through the Spark Connect API. In todays implementation of the write API from a DataFrame perspective the interface of the DataFrameWriter is as declarative as possible today.
The write support is implemented as a
Commandand does not return anything.Why are the changes needed?
Write support through Spark Connect.
Does this PR introduce any user-facing change?
Experimental API
How was this patch tested?
Added new unit tests for the behavior.