-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-52853][SDP] Prevent imperative PySpark methods in declarative pipelines #51590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52853][SDP] Prevent imperative PySpark methods in declarative pipelines #51590
Conversation
|
addressing discussion |
|
@sryza @anishm-db Ready for review. Re-scoped PR to only block imperative python methods on the client-side via a context manager |
|
|
||
|
|
||
| @contextmanager | ||
| def block_imperative_construct() -> Generator[None, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def block_imperative_construct() -> Generator[None, None, None]: | |
| def block_imperative_constructs() -> Generator[None, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to block_session_mutations to be more clear
| { | ||
| "class": RuntimeConf, | ||
| "method": "set", | ||
| "suggestion": "Instead set configuration via the pipeline spec " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to have this text inside of the error-conditions.json – that way it's in a central place that can be internationalized more easily. Thoughts on having a sub-error code for each of these? E.g. SET_CURRENT_CATALOG?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, added sub classes for each method in error-conditons.json
sryza
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One tiny more nitpick – then LGTM!
| "Session mutation <method> is not allowed in declarative pipelines." | ||
| ], | ||
| "sub_class": { | ||
| "RUNTIME_CONF_SET": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: should the SET be on the other side of RUNTIME_CONF for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
sryza
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Will merge once build is green.
…encies are available ### What changes were proposed in this pull request? This PR is a followup of #51590 that imports SDP module when connect dependencies are available ### Why are the changes needed? To make the builds without Spark Connect dependencies work, e.g., https://github.com/apache/spark/actions/runs/16552497456/job/46809414074 ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51680 from HyukjinKwon/SPARK-52853-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR adds a context manager
block_imperative_construct()that prevents the execution of imperative Spark operations within declarative pipeline definitions. When these blocked methods are called, users receive clear error messages with guidance on declarative alternatives.Blocked Methods
Configuration Management
spark.conf.set()→ Use pipeline spec orspark_confdecorator parameterCatalog Management
spark.catalog.setCurrentCatalog()→ Set via pipeline spec or dataset decoratornameargumentspark.catalog.setCurrentDatabase()→ Set via pipeline spec or dataset decoratornameargumentTemporary View Management
spark.catalog.dropTempView()→ Remove temporary view definition directlyspark.catalog.dropGlobalTempView()→ Remove temporary view definition directlyDataFrame.createTempView()→ Use@temporary_viewdecoratorDataFrame.createOrReplaceTempView()→ Use@temporary_viewdecoratorDataFrame.createGlobalTempView()→ Use@temporary_viewdecoratorDataFrame.createOrReplaceGlobalTempView()→ Use@temporary_viewdecoratorUDF Registration
spark.udf.register()→ Define and register UDFs before pipeline executionspark.udf.registerJavaFunction()→ Define and register Java UDFs before pipeline executionspark.udf.registerJavaUDAF()→ Define and register Java UDAFs before pipeline executionWhy are the changes needed?
These are imperative construct that can cause friction and unexpected behavior from within a pipeline declaration. E.g. it makes pipeline behavior sensitive to the order that Python files are imported in, which can be unpredictable. There are already existing mechanisms for setting Spark confs for pipelines:
Does this PR introduce any user-facing change?
Yes, it prevents the behavior of setting spark confs imperatively in the pipeline definition file.
How was this patch tested?
Created new test suite to test that the context manager behave as expected and ran
spark-pipelinescli manually.Was this patch authored or co-authored using generative AI tooling?
No