-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41216][CONNECT][PYTHON] Implement DataFrame.{isLocal, isStreaming, printSchema, inputFiles}
#38742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also put catalog methods like listTables/getTable in AnalysisTask
catalog apis don't require a plan, maybe better to have a separate rpc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is why I actually wanted to model each of the Catalog method as RPC because that is more closer to the nature of RPC.
5c112f0 to
b7f7cc2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to expose this in connect? The problem is hash stability. The same client can connect to different spark versions and get different hashes for this same plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are they equal or do the produce the same result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one e2e test was added for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove semantic_hash and same_semantics since they are developer apis, although they were also in pyspark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly this is a client side thing. They already have the schema, so they can construct it themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also ask the server to provide the string for df.show and df.explain, maybe simpler to also do this for printSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for just having one optional int that is a weird message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really useful here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it had some usages anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, why an extra message type just to encapsulate an enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the message for Explain was not changed, just moved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this actually mean here? What is the use case for multiple analysis tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multiple analysis tasks is for this case: user can get all attributes in single RPC and then cache them for reusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would the request contain so much detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no symmetry to the request so it should't be in the request. What is the value of this for the customer? Is this part of the Spark public API?
Do we need this for Spark Connect now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the methods added here are all public API, and used by the users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
printSchema is frequently used, but I also add others by the way
grundprinzip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to simplify this change to avoid exposing too many Spark internals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document what is the default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a developer API in Dataset, do we really need to provide it in Spark connect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I did not notice that, I am fine to remove sameSemantics and semanticHash
87d5fa2 to
0145a2f
Compare
grundprinzip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had an async discussion on this. I request the following changes in the current implementation:
- Analysis is done one RPC at a time, no need to have a list of tasks
- AnalysisRequest's only configurable parameter is the EXPLAIN_MODE
- AnalysisResponse will contain all information that is required from other consumers like
schema,is_localetc.
0145a2f to
72fcb53
Compare
grundprinzip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! This looks much cleaner!
amaliujia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for the great simplification!
72fcb53 to
536265c
Compare
DataFrame.{isLocal, isStreaming, printSchema, inputFiles}
|
Merged into master, thank you all! |
| if self._plan is None: | ||
| raise Exception("Cannot analyze on empty plan.") | ||
| query = self._plan.to_proto(self._session) | ||
| return self._session._analyze(query).is_local |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we going to cache the analyze result later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will do the caching in near future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We literally can cache everything for each DataFrame since it is immutable. But I guess we need a design/discussion to clarify details of how and when.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another interesting question is if we want to do caching on the server side.
…eSemantics`, `_repr_html_ ` ### What changes were proposed in this pull request? Disable `semanticHash`, `sameSemantics`, `_repr_html_ ` ### Why are the changes needed? 1, Disable `semanticHash`, `sameSemantics` according to the discussions in #38742 2, Disable `_repr_html_ ` since it requires [eager mode](https://github.com/apache/spark/blob/40a9a6ef5b89f0c3d19db4a43b8a73decaa173c3/python/pyspark/sql/dataframe.py#L878), otherwise, it just returns `None` ``` In [2]: spark.range(start=0, end=10)._repr_html_() is None Out[2]: True ``` ### Does this PR introduce _any_ user-facing change? for these three methods, throw `NotImplementedError` ### How was this patch tested? added test cases Closes #38815 from zhengruifeng/connect_disable_repr_html_sematic. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…ming, printSchema, inputFiles}` ### What changes were proposed in this pull request? ~~1, Make `AnalyzePlan` support specified multiple analysis tasks, that is, we can get `isLocal`, `schema`, `semanticHash` together in single RPC if we want.~~ 2, Implement following APIs - isLocal - isStreaming - printSchema - ~~semanticHash~~ - ~~sameSemantics~~ - inputFiles ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new APIs ### How was this patch tested? added UTs Closes apache#38742 from zhengruifeng/connect_df_print_schema. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…eSemantics`, `_repr_html_ ` ### What changes were proposed in this pull request? Disable `semanticHash`, `sameSemantics`, `_repr_html_ ` ### Why are the changes needed? 1, Disable `semanticHash`, `sameSemantics` according to the discussions in apache#38742 2, Disable `_repr_html_ ` since it requires [eager mode](https://github.com/apache/spark/blob/40a9a6ef5b89f0c3d19db4a43b8a73decaa173c3/python/pyspark/sql/dataframe.py#L878), otherwise, it just returns `None` ``` In [2]: spark.range(start=0, end=10)._repr_html_() is None Out[2]: True ``` ### Does this PR introduce _any_ user-facing change? for these three methods, throw `NotImplementedError` ### How was this patch tested? added test cases Closes apache#38815 from zhengruifeng/connect_disable_repr_html_sematic. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…ming, printSchema, inputFiles}` ### What changes were proposed in this pull request? ~~1, Make `AnalyzePlan` support specified multiple analysis tasks, that is, we can get `isLocal`, `schema`, `semanticHash` together in single RPC if we want.~~ 2, Implement following APIs - isLocal - isStreaming - printSchema - ~~semanticHash~~ - ~~sameSemantics~~ - inputFiles ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new APIs ### How was this patch tested? added UTs Closes apache#38742 from zhengruifeng/connect_df_print_schema. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…eSemantics`, `_repr_html_ ` ### What changes were proposed in this pull request? Disable `semanticHash`, `sameSemantics`, `_repr_html_ ` ### Why are the changes needed? 1, Disable `semanticHash`, `sameSemantics` according to the discussions in apache#38742 2, Disable `_repr_html_ ` since it requires [eager mode](https://github.com/apache/spark/blob/40a9a6ef5b89f0c3d19db4a43b8a73decaa173c3/python/pyspark/sql/dataframe.py#L878), otherwise, it just returns `None` ``` In [2]: spark.range(start=0, end=10)._repr_html_() is None Out[2]: True ``` ### Does this PR introduce _any_ user-facing change? for these three methods, throw `NotImplementedError` ### How was this patch tested? added test cases Closes apache#38815 from zhengruifeng/connect_disable_repr_html_sematic. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
What changes were proposed in this pull request?
1, MakeAnalyzePlansupport specified multiple analysis tasks, that is, we can getisLocal,schema,semanticHashtogether in single RPC if we want.2, Implement following APIs
semanticHashsameSemanticsWhy are the changes needed?
for API coverage
Does this PR introduce any user-facing change?
yes, new APIs
How was this patch tested?
added UTs