forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 1
[pull] master from apache:master #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…taFrame.dtypes` ### What changes were proposed in this pull request? Implement `DataFrame.__repr__` and `DataFrame.dtypes` ### Why are the changes needed? For api coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? added UT Closes #38735 from zhengruifeng/connect_df_repr. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…ethod ### What changes were proposed in this pull request? This PR updates `_merge_type` method to allow upcast from any `AtomicType` to `StringType` similar to Cast.scala (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L297). This allows us to avoid TypeError errors in the case when it is okay to merge types. For instance, examples below used to fail with TypeError "Can not merge type ... and ..." but pass with this patch. ```python spark.createDataFrame([[1.33, 1], ["2.1", 1]]) spark.createDataFrame([({1: True}, 1), ({"2": False}, 3)]) ``` It also seems to be okay to merge map keys with different types but I would like to call it out explicitly. ### Why are the changes needed? This makes the behaviour between PySpark and Arrow execution more consistent. For example, arrow can handle type upcasts while PySpark cannot. ### Does this PR introduce _any_ user-facing change? Users may notice that examples with schema inference in PySpark DataFrames, that used to fail due to TypeError, run successfully. This is due to extended type merge handling when one of the values is of StringType. When merging AtomicType values with StringType values, the final merged type will be StringType. For example, a combination of double and string values in a column would be cast to StringType: ```python # Both values 1.33 and 2.1 will be cast to strings spark.createDataFrame([[1.33, 1], ["2.1", 1]]) # Result: # ["1.33", 1] # ["2.1", 1] ``` Note that this also applies to nested types. For example, Before: ```python # Throws TypeError "Can not merge type" spark.createDataFrame([({1: True}, 1), ({"2": False}, 3)]) ``` After: ```python spark.createDataFrame([({1: True}, 1), ({"2": False}, 3)]) # Result: # {"1": true} 1 # {"2": false} 3 ``` ### How was this patch tested? I updated the existing unit tests and added a couple of new ones to check that we can upcast to StringType. Closes #38731 from sadikovi/pyspark-type-inference. Authored-by: Ivan Sadikov <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…larger than 2GB bytes array limit` in `DatasetLargeResultCollectingSuite` ### What changes were proposed in this pull request? This pr ignore `collect data with single partition larger than 2GB bytes array limit` in `DatasetLargeResultCollectingSuite` as default due it cannot run successfully with Spark default Java Options. ### Why are the changes needed? Avoid local test failure. ### Does this PR introduce _any_ user-facing change? No, just for test ### How was this patch tested? - Pass GA - Manual test: in my test environment, change `-Xmx4g` to `-Xmx10g`, maven and sbt can test successfully in my Closes #38704 from LuciferYang/SPARK-41193. Authored-by: yangjie01 <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? Implement `DataFrame.drop` with a proto message ### Why are the changes needed? for api coverage ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? added UT Closes #38686 from zhengruifeng/connect_df_drop. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…3 test pass ### What changes were proposed in this pull request? This PR is a followup of #38631 that fixes the test to pass in Scala 2.13 by avoiding using `Buffer` that becomes `List` in Scala 2.13. ### Why are the changes needed? To fix up the Scala 2.13 build, see https://github.com/apache/spark/actions/runs/3517345801/jobs/5895043640 ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually tested. Closes #38752 from HyukjinKwon/SPARK-40809-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? In the PR, I propose new error class `FAILED_FUNCTION_CALL` for errors that might occur while preparing functions class, for instance during creation of expression instances. Also, the PR propagates `SparkThrowable` to users in handling any exceptions coming from preparations of functions calls. ### Why are the changes needed? To improve user experience with Spark SQL, in particular, the PR makes errors more clear. ### Does this PR introduce _any_ user-facing change? Yes, it affects the user-facing errors. ### How was this patch tested? By running the modified test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` Closes #38744 from MaxGekk/failed-builtin-func. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…o `COLUMN_ALREADY_EXISTS` ### What changes were proposed in this pull request? In the PR, I propose to assign the proper name `COLUMN_ALREADY_EXISTS ` to the legacy error class `_LEGACY_ERROR_TEMP_1233 `, and modify test suite to use `checkError()` which checks the error class name, context and etc. Also this PR improves the error message. ### Why are the changes needed? Proper name improves user experience w/ Spark SQL. ### Does this PR introduce _any_ user-facing change? Yes, the PR changes an user-facing error message. ### How was this patch tested? By running the modified test suites: ``` $ $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt -Phive-2.3 "testOnly *HiveSQLInsertTestSuite" ``` Closes #38685 from MaxGekk/columns-already-exist. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? Implement `DataFrame.isEmpty` ### Why are the changes needed? API Coverage ### Does this PR introduce _any_ user-facing change? Yes, new api ### How was this patch tested? added UT Closes #38734 from zhengruifeng/connect_df_is_empty. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…PTY_LOCATION` ### What changes were proposed in this pull request? This PR proposes to rename `UNSUPPORTED_EMPTY_LOCATION` to `INVALID_EMPTY_LOCATION`. ### Why are the changes needed? Error class and its message should be clear/brief, and should not ambiguously specific when it illustrates things that possibly supported in the future. ### Does this PR introduce _any_ user-facing change? Error message changes From ``` "Unsupported empty location." ``` To ``` "The location name cannot be empty string, but `...` was given." ``` ### How was this patch tested? ``` $ build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*” $ build/sbt "core/testOnly *SparkThrowableSuite" ``` Closes #38650 from itholic/SPARK-41135. Authored-by: itholic <[email protected]> Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? Update hive table stats without convert from HiveTable -> CatalogTable -> HiveTable . `HiveExternalCatalog.alterTableStats()` will convert a raw HiveTable to CatalogTable which will store schema as lowercase and keep bucket columns as they are. `HiveClientImpl.alterTable()` will throw `Bucket columns V1 is not part of the table columns` exception when re-convert the CatalogTable to a HiveTable ### Why are the changes needed? Bug fix, refer to #32675 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update exists UT Closes #38495 from wankunde/write_stats_directly. Authored-by: Kun Wan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Support using RocksDB as the KVStore in live UI. The location of RocksDB can be set via configuration `spark.ui.store.path`. The configuration is optional. The default KV store will still be in memory. Note: let's make it ROCKSDB only for now instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with improvements on writes and reads. Furthermore, we can reuse the RocksDBFileManager in streaming for replicating the local RocksDB file to DFS. The replication in DFS can be used for the Spark history server. ### Why are the changes needed? The current architecture of Spark live UI and Spark history server(SHS) is too simple to serve large clusters and heavy workloads: - Spark stores all the live UI date in memory. The size can be a few GBs and affects the driver's stability (OOM). - There is a limitation of storing 1000 queries only. Note that we can’t simply increase the limitation under the current Architecture. I did a memory profiling. Storing one query execution detail can take 800KB while storing one task requires 0.3KB. So for 1000 SQL queries with 1000* 2000 tasks, the memory usage for query execution and task data will be 1.4GB. Spark UI stores UI data for jobs/stages/executors as well. So to store 10k queries, it may take more than 14GB. - SHS has to parse JSON format event log for the initial start. The uncompressed event logs can be as big as a few GBs, and the parse can be quite slow. Some users reported they had to wait for more than half an hour. With RocksDB as KVStore, we can improve the stability of Spark driver and fasten the startup of SHS. ### Does this PR introduce _any_ user-facing change? Yes, supporting RocksDB as the KVStore in live UI. The location of RocksDB can be set via configuration `spark.ui.store.path`. The configuration is optional. The default KV store will still be in memory. ### How was this patch tested? New UT Preview of the doc change: <img width="895" alt="image" src="https://user-images.githubusercontent.com/1097932/203184691-b6815990-b7b0-422b-aded-8e1771c0c167.png"> Closes #38567 from gengliangwang/liveUIKVStore. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
…thon client ### What changes were proposed in this pull request? Implement `DataFrame.SelectExpr` in Python client. `SelectExpr` also has a good amount of usage. ### Why are the changes needed? API coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38723 from amaliujia/support_select_expr_in_python. Authored-by: Rui Wang <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? Reenable test_fill_na ### Why are the changes needed? `test_fill_na` was disabled by mistake in #38723 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? reenabled test Closes #38763 from zhengruifeng/connect_reenable_test_fillna. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? The original PR to introduce the error class `PATH_NOT_FOUND` was reverted since it breaks the tests in different test env. This PR proposes to restore it back. ### Why are the changes needed? Restoring the reverted changes with proper fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing CI should pass. Closes #38575 from itholic/SPARK-40948-followup. Authored-by: itholic <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…on` stable to fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.13 ### What changes were proposed in this pull request? This pr add a sort when `columnAlreadyExistsError` will be thrown to make the result of `SchemaUtils#checkColumnNameDuplication` stable. ### Why are the changes needed? Fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.13 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GA - Manual test: ``` dev/change-scala-version.sh 2.13 build/sbt clean "sql/testOnly org.apache.spark.sql.DataFrameSuite" -Pscala-2.13 build/sbt "sql/testOnly org.apache.spark.sql.execution.datasources.json.JsonV1Suite" -Pscala-2.13 build/sbt "sql/testOnly org.apache.spark.sql.execution.datasources.json.JsonV2Suite" -Pscala-2.13 build/sbt "sql/testOnly org.apache.spark.sql.execution.datasources.json.JsonLegacyTimeParserSuite" -Pscala-2.13 ``` All tests passed Closes #38764 from LuciferYang/SPARK-41206. Authored-by: yangjie01 <[email protected]> Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? This is a followup of #38302 . For events generated by old versions of Spark, which do not have the new `errorMessage` field, we should use the old way to detect query execution status (failed or not). This PR also adds a UI test for the expected behavior. ### Why are the changes needed? backward compatibility ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #38747 from cloud-fan/ui. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This pr aims upgrade janino from 3.1.7 to 3.1.9 ### Why are the changes needed? This version bring some improvement and bug fix, and janino 3.1.9 will no longer test Java 12, 15, 16 because these STS versions have been EOL: - janino-compiler/janino@v3.1.7...v3.1.9 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test this pr with Scala 2.13, all test passed Closes #38075 from LuciferYang/SPARK-40633. Lead-authored-by: yangjie01 <[email protected]> Co-authored-by: YangJie <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? This pr aims upgrade slf4j related dependencies from 2.0.3 to 2.0.4. ### Why are the changes needed? A bug fix version: - [LoggerFactory only loads services from context class loader](https://jira.qos.ch/browse/SLF4J-544) The release notes as follows: - https://www.slf4j.org/news.html#2.0.4 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #38758 from LuciferYang/SPARK-41223. Authored-by: yangjie01 <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? In the PR, I propose to move out the error sub-classes related to number format issues from `DATATYPE_MISMATCH` to their own error class `INVALID_FORMAT`. Also, the PR deletes the error class `INVALID_LIKE_PATTERN`, and moves their sub-classes to `INVALID_FORMAT`. ### Why are the changes needed? To improve usability of Spark SQL and to don't confuse users by error class names related to values but to to its types. ### Does this PR introduce _any_ user-facing change? Yes, the PR changes some user-facing errors. ### How was this patch tested? By running the affected test suites: ``` $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "core/testOnly *SparkThrowableSuite" $ build/sbt "test:testOnly *StringExpressionsSuite" ``` Closes #38755 from MaxGekk/refactor-datatype-mismatch. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…d `format` of `to_binary()` ### What changes were proposed in this pull request? This pr overrides the `checkInputDataTypes()` method of `ToBinary` function to propagate error class to users for invalid `format`. ### Why are the changes needed? Migration onto error classes unifies Spark SQL error messages. ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. ### How was this patch tested? Pass GitHub Actions Closes #38737 from LuciferYang/SPARK-41174. Authored-by: yangjie01 <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…EADME ### What changes were proposed in this pull request? Update the protobuf version to be `3.19.3` in README. ### Why are the changes needed? Since we have homogenized the protobuf version across server and client to be `3.19.3`, we should update the README to be consistent. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #38775 from xinrong-meng/new_readme. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? For the better user experience we need to properly throw exceptions for functions that are not supported in Spark Connect. ### Why are the changes needed? User Experience ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38762 from grundprinzip/SPARK-41225. Authored-by: Martin Grund <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…tionRules to injectPlanNormalizationRule ### What changes were proposed in this pull request? Followup of #38692. To follow other APIs in `SparkSessionExtensions`, the name should be `inject...Rule` and `build...Rules`. ### Why are the changes needed? typo fix ### Does this PR introduce _any_ user-facing change? not a released API ### How was this patch tested? n/a Closes #38767 from cloud-fan/small. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? 1, remove `__init__.py` 2, rename `ColumnOrString ` as `ColumnOrName` to be the same as pyspark ### Why are the changes needed? 1, there are two typing files now: `_typing.py` and `__init__.py`, they are used in different files, which is very confusing; 2, the definitions of `LiteralType` are different, the old one in `_typing.py` was never used 3, both `ColumnOrString ` and `ColumnOrName` are used now; ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing UTs Closes #38757 from zhengruifeng/connect_typing. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…tables when building Spark Protobuf ### What changes were proposed in this pull request? This PR use profile named `-Puser-defined-protoc` to support that users can build and test `protobuf` module by specifying custom `protoc` executables. ### Why are the changes needed? As described in [SPARK-41215](https://issues.apache.org/jira/browse/SPARK-41215), the latest versions of `protoc` have the minimum version requirements for basic libraries such as `glibc` and `glibcxx`. Because of that it is not possible to test-compile the `protobuf` module out of the box on CentOS 6 or CentOS 7. Instead the following error messages is shown: ``` [ERROR] /home/disk1/spark-ut/spark/connector/protobuf/src/test/resources/protobuf/timestamp.proto [0:0]: /tmp/protoc6599263403262688374.exe: /lib64/libc.so.6: version `GLIBC_2.14' not found (required by /tmp/protoc6599263403262688374.exe) [ERROR] /home/disk1/spark-ut/spark/connector/protobuf/src/test/resources/protobuf/timestamp.proto [0:0]: /tmp/protoc6599263403262688374.exe: /usr/lib64/libstdc++.so.6: version `GLIBCXX_3.4.18' not found (required by /tmp/protoc6599263403262688374.exe) [ERROR] /home/disk1/spark-ut/spark/connector/protobuf/src/test/resources/protobuf/timestamp.proto [0:0]: /tmp/protoc6599263403262688374.exe: /usr/lib64/libstdc++.so.6: version `GLIBCXX_3.4.14' not found (required by /tmp/protoc6599263403262688374.exe) [ERROR] /home/disk1/spark-ut/spark/connector/protobuf/src/test/resources/protobuf/timestamp.proto [0:0]: /tmp/protoc6599263403262688374.exe: /usr/lib64/libstdc++.so.6: version `CXXABI_1.3.5' not found (required by /tmp/protoc6599263403262688374.exe) ``` ### Does this PR introduce _any_ user-facing change? No, the way to using official pre-release `protoc` binary files is activated by default. ### How was this patch tested? - Pass GitHub Actions - Manual test on CentOS6u3 and CentOS7u4 ```bash export PROTOBUF_PROTOC_EXEC_PATH=/path-to-protoc-exe ./build/mvn clean install -pl connector/protobuf -Puser-defined-protoc -am -DskipTests ./build/mvn clean test -pl connector/protobuf -Puser-defined-protoc ``` and ```bash export PROTOBUF_PROTOC_EXEC_PATH=/path-to-protoc-exe ./build/sbt clean "protobuf/compile" -Puser-defined-protoc ./build/sbt "protobuf/test" -Puser-defined-protoc ``` Closes #38743 from WolverineJiang/master. Authored-by: jianghaonan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? This PR supports local data for LocalRelation, we decided to use Arrow IPC batches format to transfer data. the schema is embedded in the binary records so we can remove the `attributes` field from `LocalRelation` ### Why are the changes needed? It's necessary to have local data to do unit test and validation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? unit test. Closes #38659 from dengziming/SPARK-41114. Authored-by: dengziming <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? This PR proposes to correct the minor syntax on error message for `UNEXPECTED_INPUT_TYPE`, ### Why are the changes needed? Error message should be started with upper-case character, and clear to read. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` ./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*” ``` Closes #38766 from itholic/minor-UNEXPECTED_INPUT_TYPE. Lead-authored-by: itholic <[email protected]> Co-authored-by: Haejoon Lee <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…P_1042 ### What changes were proposed in this pull request? In the PR, I propose to assign a name to the error class _LEGACY_ERROR_TEMP_1042. ### Why are the changes needed? Proper names of error classes should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38707 from panbingkun/SPARK-41176. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…P_1092 ### What changes were proposed in this pull request? In the PR, I propose to assign the name `INVALID_SCHEMA` to the error class `_LEGACY_ERROR_TEMP_1092`. ### Why are the changes needed? Proper names of error classes should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38710 from panbingkun/SPARK-41179. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…P_1102 ### What changes were proposed in this pull request? In the PR, I propose to assign the name `INVALID_EXTRACT_FIELD` to the error class `_LEGACY_ERROR_TEMP_1102`. ### Why are the changes needed? Proper names of error classes should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38725 from panbingkun/SPARK-41182. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…optional pandas dataframe ### What changes were proposed in this pull request? The server guarantees to send at least one arrow batch with schema even there is empty result. In this case, `DataFrame. toPandas` always can return a Pandas DataFrame. This PR decouples the client side execution path for `Command` and `Relation` to remove `Optional` from the returneed type of `DataFrame. toPandas `. ### Why are the changes needed? API coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38786 from amaliujia/returned_pandas_not_optional. Authored-by: Rui Wang <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…ng query ### What changes were proposed in this pull request? This PR proposes to add a new test suite specifically for self-union tests on streaming query. The test cases are acceptance tests for 4 different cases, DSv1 vs DSv2 / DataStreamReader API vs table API. ### Why are the changes needed? This PR brings more test coverage on streaming workloads. We should have caught an issue during the work of [SPARK-39564](https://issues.apache.org/jira/browse/SPARK-39564) if we had this test suite. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test suite. Closes #38785 from HeartSaVioR/SPARK-41249. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
… implementation to stream from server to client ### What changes were proposed in this pull request? This PR proposes an optimized Arrow-based collect, that is virtually #38720 that implements the logics except a couple of nits. ### Why are the changes needed? To stream the Arrow batch from the server to the client side instead of waiting all the jobs to finish. ### Does this PR introduce _any_ user-facing change? No, this feature isn't released yet. ### How was this patch tested? Unittest added. Closes #38720 Closes #38759 from HyukjinKwon/SPARK-41165-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
…nversion base on Scala 2.13 code ### What changes were proposed in this pull request? This pr aims clean up redundant collection conversion base on Scala 2.13 code ### Why are the changes needed? Code clean up. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions Closes #38598 from LuciferYang/redundant-collection-conversion. Lead-authored-by: YangJie <[email protected]> Co-authored-by: yangjie01 <[email protected]> Signed-off-by: Sean Owen <[email protected]>
… Protobuf connector ### What changes were proposed in this pull request? Unify the Protobuf versions in Spark connect and Protobuf connector. ### Why are the changes needed? The Protobuf dependencies will have the same behavior. And future upgrades will be easier. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA tests Closes #38783 from gengliangwang/unifyProtobufVersion. Lead-authored-by: Gengliang Wang <[email protected]> Co-authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? This PR proposes upgrading pandas to 1.5.2, for pandas API on Spark. New version of pandas (1.5.2) was released at Nov 22, 2022, brings some bug fix, the release notes as follows: https://pandas.pydata.org/pandas-docs/dev/whatsnew/v1.5.2.html ### Why are the changes needed? We should follow the behavior of latest pandas, and support it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38787 from panbingkun/upgrade_pandas_1.5.2. Authored-by: panbingkun <[email protected]> Signed-off-by: Yikun Jiang <[email protected]>
### What changes were proposed in this pull request? 1, in the sever side, make `proto_datatype` <-> `catalyst_datatype` conversion support all the built-in sql datatypes; 2, in the client side, make `proto_datatype` <-> `pyspark_catalyst_datatype` conversion support [all the datatypes that are supported in pyspark now.](https://github.com/apache/spark/blob/master/python/pyspark/sql/types.py#L60-L83) ### Why are the changes needed? right now, only `long`, `string`, `struct` are supported ``` grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Does not support convert float to connect proto types." debug_error_string = "{"created":"1669206685.760099000","description":"Error received from peer ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Does not support convert float to connect proto types.","grpc_status":2}" ``` this PR make the schema and literal expr support more datatypes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #38770 from zhengruifeng/connect_support_more_datatypes. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
… type ### What changes were proposed in this pull request? This PR proposes that Relations (e.g. Aggregate in this PR) should only deal with `Expression` than `str`. `str` could be mapped to different expressions (e.g. sql expression, unresolved_attribute, etc.). Relations are not supposed to understand the difference of `str` but DataFrame should understand it. This PR specifically changes for `Aggregate`. ### Why are the changes needed? Codebase refactoring. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38768 from amaliujia/SPARK-41230. Authored-by: Rui Wang <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? Upgrade actions/labeler to v4 ### Why are the changes needed? Since https://github.com/actions/labeler/releases/tag/v4.0.0 use node 16 to cleanup warning: `Node.js 12 actions are deprecated. For more information see: https://github.blog/changelog/2022-09-22-github-actions-all-actions-will-begin-running-on-node16-instead-of-node12/. Please update the following actions to use Node.js 16: actions/labeler5f867a63be70efff62b767459b009290364495eb` like https://github.com/apache/spark/actions/runs/3540940080 ### Does this PR introduce _any_ user-facing change? No, dev only ### How was this patch tested? Test in my local repo based on this patch: Yikun#188 Yikun#189 Closes #38794 from Yikun/label. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Yikun Jiang <[email protected]>
…ming, printSchema, inputFiles}` ### What changes were proposed in this pull request? ~~1, Make `AnalyzePlan` support specified multiple analysis tasks, that is, we can get `isLocal`, `schema`, `semanticHash` together in single RPC if we want.~~ 2, Implement following APIs - isLocal - isStreaming - printSchema - ~~semanticHash~~ - ~~sameSemantics~~ - inputFiles ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new APIs ### How was this patch tested? added UTs Closes #38742 from zhengruifeng/connect_df_print_schema. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? The pr aims to migrate the map options errors onto error classes. ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38730 from panbingkun/SPARK-41181. Authored-by: panbingkun <[email protected]> Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? This PR aims to upgrade Apache Arrow to 10.0.1. ### Why are the changes needed? This version bring some bug fix, release notes as follow: apache/arrow@f786b8d ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38788 from panbingkun/upgrade_arrow_10.0.1. Authored-by: panbingkun <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? Upgrade postgresql from 42.5.0 to 42.5.1 [Changelog](https://jdbc.postgresql.org/changelogs/2022-11-23-42.5.1-release/) ### Why are the changes needed? [CVE-2022-41946](https://nvd.nist.gov/vuln/detail/CVE-2022-41946) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #38791 from bjornjorgensen/postgressql-42.5.1. Authored-by: Bjørn <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? Upgrade Protobuf from 3.19.4 to 3.19.5 ### Why are the changes needed? [CVE-2022-1941](https://nvd.nist.gov/vuln/detail/CVE-2022-1941) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #38774 from bjornjorgensen/protobuf-3.19.5. Lead-authored-by: Bjørn Jørgensen <[email protected]> Co-authored-by: Bjorn Jorgensen <[email protected]> Co-authored-by: Bjørn <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
pull bot
pushed a commit
that referenced
this pull request
Nov 1, 2025
### What changes were proposed in this pull request? This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52). ### Why are the changes needed? Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified. Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation. For a query ```sql select (select max(i) from df) as max_i, (select min(i) from df) as min_i ``` Before introducing the canonicalization, the plan is ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- Project [i#10] : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery** ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5] : :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : +- Project [i#0] : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- ReusedSubquery Subquery subquery#2, [id=#40] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#41] : +- AdaptiveSparkPlan isFinalPlan=false : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37] : +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) : +- Project [i#0] : +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52529 from yhuang-db/scan-canonicalization. Authored-by: yhuang-db <[email protected]> Signed-off-by: Peter Toth <[email protected]>
huangxiaopingRD
pushed a commit
that referenced
this pull request
Nov 25, 2025
### What changes were proposed in this pull request? This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52). ### Why are the changes needed? Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified. Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation. For a query ```sql select (select max(i) from df) as max_i, (select min(i) from df) as min_i ``` Before introducing the canonicalization, the plan is ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- Project [i#10] : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5] : :- Subquery subquery#2, [id=#32] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19] +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#33] : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63] +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- *(1) Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30] +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15]) +- Project [i#10] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery** ``` == Parsed Logical Plan == 'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- 'Project [unresolvedalias('max('i))] : : +- 'UnresolvedRelation [df], [], false : +- 'Project [unresolvedalias('min('i))] : +- 'UnresolvedRelation [df], [], false +- OneRowRelation == Analyzed Logical Plan == max_i: int, min_i: int Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5] : :- Aggregate [max(i#0) AS max(i)#7] : : +- SubqueryAlias df : : +- View (`df`, [i#0, j#1]) : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Aggregate [min(i#10) AS min(i)#9] : +- SubqueryAlias df : +- View (`df`, [i#10, j#11]) : +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Optimized Logical Plan == Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5] : :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : : +- Project [i#0] : : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9] : +- Project [i#0] : +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5 +- OneRowRelation == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 0 +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- ReusedSubquery Subquery subquery#2, [id=#40] +- *(1) Scan OneRowRelation[] +- == Initial Plan == Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5] : :- Subquery subquery#2, [id=#40] : : +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == ResultQueryStage 1 +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- ShuffleQueryStage 0 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71] +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- *(1) Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- == Initial Plan == Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22] +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) +- Project [i#0] +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] : +- Subquery subquery#4, [id=#41] : +- AdaptiveSparkPlan isFinalPlan=false : +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14] : +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37] : +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17]) : +- Project [i#0] : +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: [] +- Scan OneRowRelation[] ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52529 from yhuang-db/scan-canonicalization. Authored-by: yhuang-db <[email protected]> Signed-off-by: Peter Toth <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot]
Can you help keep this open source service alive? 💖 Please sponsor : )