forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
update my fork #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### What changes were proposed in this pull request? JDBC connection providers implementation formatted in a wrong way. In this PR I've fixed the formatting. ### Why are the changes needed? Wrong spacing in JDBC connection providers. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #28945 from gaborgsomogyi/provider_spacing. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…e example for `timestamp_seconds`
### What changes were proposed in this pull request?
Modify the example for `timestamp_seconds` and replace `collect()` by `show()`.
### Why are the changes needed?
The SQL config `spark.sql.session.timeZone` doesn't influence on the `collect` in the example. The code below demonstrates that:
```
$ export TZ="UTC"
```
```python
>>> from pyspark.sql.functions import timestamp_seconds
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time'])
>>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect()
[Row(ts=datetime.datetime(2008, 12, 25, 15, 30))]
```
The expected time is **07:30 but we get 15:30**.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running the modified example via:
```
$ ./python/run-tests --modules=pyspark-sql
```
Closes #28959 from MaxGekk/SPARK-32088-fix-timezone-issue-followup.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
…potential conflicts in dev ### What changes were proposed in this pull request? This PR proposes to partially reverts back in the tests and some codes at #27728 without touching any behaivours. Most of changes in tests are back before #27728 by combining `withNestedDataFrame` and `withParquetDataFrame`. Basically, it addresses the comments #27728 (comment), and my own comment in another PR at #28761 (comment) ### Why are the changes needed? For maintenance purpose and to avoid a potential conflicts during backports. And also in case when other codes are matched with this. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested. Closes #28955 from HyukjinKwon/SPARK-25556-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…US operations ### What changes were proposed in this pull request? fix error exception messages during exceptions on Union and set operations ### Why are the changes needed? Union and set operations can only be performed on tables with the compatible column types,while when we have more than two column, the exception messages will have wrong column index. Steps to reproduce: ``` drop table if exists test1; drop table if exists test2; drop table if exists test3; create table if not exists test1(id int, age int, name timestamp); create table if not exists test2(id int, age timestamp, name timestamp); create table if not exists test3(id int, age int, name int); insert into test1 select 1,2,'2020-01-01 01:01:01'; insert into test2 select 1,'2020-01-01 01:01:01','2020-01-01 01:01:01'; insert into test3 select 1,3,4; ``` Query1: ```sql select * from test1 except select * from test2; ``` Result1: ``` Error: org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types. timestamp <> int at the second column of the second table;; 'Except false :- Project [id#620, age#621, name#622] : +- SubqueryAlias `default`.`test1` : +- HiveTableRelation `default`.`test1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#620, age#621, name#622] +- Project [id#623, age#624, name#625] +- SubqueryAlias `default`.`test2` +- HiveTableRelation `default`.`test2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#623, age#624, name#625] (state=,code=0) ``` Query2: ```sql select * from test1 except select * from test3; ``` Result2: ``` Error: org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types int <> timestamp at the 2th column of the second table; ``` the above query1 has the right exception message the above query2 have the wrong errors information, it may need to change to the following ``` Error: org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types. int <> timestamp at the third column of the second table ``` ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? unit test Closes #28951 from GuoPhilipse/32131-correct-error-messages. Lead-authored-by: GuoPhilipse <[email protected]> Co-authored-by: GuoPhilipse <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…tatistics ### What changes were proposed in this pull request? Update documentation to reflect changes in faf220a I've changed the documentation to reflect updated statistics may be used to improve query plan. ### Why are the changes needed? I believe the documentation is stale and misleading. ### Does this PR introduce _any_ user-facing change? Yes, this is a javadoc documentation fix. ### How was this patch tested? Doc fix. Closes #28925 from emkornfield/spark-32095. Authored-by: Micah Kornfield <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Spark can't push down scan predicate condition of **Or**: Such as if I have a table `default.test`, it's partition col is `dt`, If we use query : ``` select * from default.test where dt=20190625 or (dt = 20190626 and id in (1,2,3) ) ``` In this case, Spark will resolve **Or** condition as one expression, and since this expr has reference of "id", then it can't been push down. Base on pr #28733, In my PR , for SQL like `select * from default.test` `where dt = 20190626 or (dt = 20190627 and xxx="a") ` For this condition `dt = 20190626 or (dt = 20190627 and xxx="a" )`, it will been converted to CNF ``` (dt = 20190626 or dt = 20190627) and (dt = 20190626 or xxx = "a" ) ``` then condition `dt = 20190626 or dt = 20190627` will be push down when partition pruning ### Why are the changes needed? Optimize partition pruning ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Added UT Closes #28805 from AngersZhuuuu/cnf-for-partition-pruning. Lead-authored-by: angerszhu <[email protected]> Co-authored-by: AngersZhuuuu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…odel ### What changes were proposed in this pull request? Add summary to RandomForestClassificationModel... ### Why are the changes needed? so user can get a summary of this classification model, and retrieve common metrics such as accuracy, weightedTruePositiveRate, roc (for binary), pr curves (for binary), etc. ### Does this PR introduce _any_ user-facing change? Yes ``` RandomForestClassificationModel.summary RandomForestClassificationModel.evaluate ``` ### How was this patch tested? Add new tests Closes #28913 from huaxingao/rf_summary. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? This is a followup of #28534 , to make `TIMESTAMP_SECONDS` function support fractional input as well. ### Why are the changes needed? Previously the cast function can cast fractional values to timestamp. Now we suggest users to ues these new functions, and we need to cover all the cast use cases. ### Does this PR introduce _any_ user-facing change? Yes, now `TIMESTAMP_SECONDS` function accepts fractional input. ### How was this patch tested? new tests Closes #28956 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This PR aims to add `PrometheusServletSuite`. ### Why are the changes needed? This improves the test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the newly added test suite. Closes #28865 from erenavsarogullari/spark_driver_prometheus_metrics_improvement. Authored-by: Eren Avsarogullari <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ctive in data source options ### What changes were proposed in this pull request? This is a followup of #28760 to fix the remaining issues: 1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand` 2. should consider data source options when inferring schema for file source 3. should consider data source options when getting the qualified path in file source v2. ### Why are the changes needed? We didn't catch these issues in #28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? rewrite the test to make sure the entire data source read/write action can succeed. Closes #28948 from cloud-fan/fix. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? Set the JSON option `inferTimestamp` to `false` if an user don't pass it as datasource option. ### Why are the changes needed? To prevent perf regression while inferring schemas from JSON with potential timestamps fields. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? - Modified existing tests in `JsonSuite` and `JsonInferSchemaSuite`. - Regenerated results of `JsonBenchmark` in the environment: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 | Closes #28966 from MaxGekk/json-inferTimestamps-disable-by-default. Authored-by: Max Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…nt and sql when AQE is enabled ### What changes were proposed in this pull request? As the followup of #28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. ### Why are the changes needed? When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. ### Does this PR introduce _any_ user-facing change? Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. ### How was this patch tested? Unit tests. Closes #28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ui.port) property if mentioned explicitly ## What changes were proposed in this pull request? When a Spark Job launched in Cluster mode with Yarn, Application Master sets spark.ui.port port to 0 which means Driver's web UI gets any random port even if we want to explicitly set the Port range for Driver's Web UI ## Why are the changes needed? We access Spark Web UI via Knox Proxy, and there are firewall restrictions due to which we can not access Spark Web UI since Web UI port range gets random port even if we set explicitly. This Change will check if there is a specified port range explicitly mentioned so that it does not assign a random port. ## Does this PR introduce any user-facing change? No ## How was this patch tested? Local Tested. Closes #28880 from rajatahujaatinmobi/ahujarajat261/SPARK-32039-change-yarn-webui-port-range-with-property-latest-spark. Authored-by: Rajat Ahuja <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This patch fixes wrong groupBy result if the grouping key is a null-value struct. ### Why are the changes needed? `NormalizeFloatingNumbers` reconstructs a struct if input expression is StructType. If the input struct is null, it will reconstruct a struct with null-value fields, instead of null. ### Does this PR introduce _any_ user-facing change? Yes, fixing incorrect groupBy result. ### How was this patch tested? Unit test. Closes #28962 from viirya/SPARK-32136. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? Correct file seprate use in `ExecutorDiskUtils.createNormalizedInternedPathname` on Windows ### Why are the changes needed? `ExternalShuffleBlockResolverSuite` failed on Windows, see detail at: https://issues.apache.org/jira/browse/SPARK-32121 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existed test suite. Closes #28940 from pan3793/SPARK-32121. Lead-authored-by: pancheng <[email protected]> Co-authored-by: chengpan <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? Minor fix so that the documentation of `getActiveSession` is fixed. The sample code snippet doesn't come up formatted rightly, added spacing for this to be fixed. Also added return to docs. ### Why are the changes needed? The sample code is getting mixed up as description in the docs. [Current Doc Link](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=getactivesession#pyspark.sql.SparkSession.getActiveSession)  ### Does this PR introduce _any_ user-facing change? Yes, documentation of getActiveSession is fixed. And added description about return. ### How was this patch tested? Adding a spacing between description and code seems to fix the issue. Closes #28978 from animenon/docs_minor. Authored-by: animenon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? For queries like `t1d in (SELECT t2d FROM t2 ORDER BY t2c LIMIT 2)`, the result can be non-deterministic as the result of the subquery may output different results (it's not sorted by `t2d` and it has shuffle). This PR makes the test more robust by sorting the output column. ### Why are the changes needed? avoid flaky test ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #28976 from cloud-fan/small. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
## What changes were proposed in this pull request? Check the namespace existence while calling "use namespace", and throw NoSuchNamespaceException if namespace not exists. ### Why are the changes needed? Users need to know that the namespace does not exist when they try to set a wrong namespace. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run all suites and add a test for this Closes #27900 from stczwd/SPARK-31100. Authored-by: stczwd <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…s from in HiveExternalCatalogSuite ### What changes were proposed in this pull request? 1.Merge two similar tests for SPARK-31061 and make the code clean. 2.Fix table alter issue due to lose path. ### Why are the changes needed? Because this two tests for SPARK-31061 is very similar and could be merged. And the first test case should use `rawTable` instead of `parquetTable` to alter. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. Closes #28980 from TJX2014/master-follow-merge-spark-31061-test-case. Authored-by: TJX2014 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…mark ### What changes were proposed in this pull request? Set the JSON option `inferTimestamp` to `true` for the cases that measure perf of timestamp inference. ### Why are the changes needed? The PR #28966 disabled timestamp inference by default. As a consequence, some benchmarks don't measure perf of timestamp inference from JSON fields. This PR explicitly enable such inference. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By re-generating results of `JsonBenchmark`. Closes #28981 from MaxGekk/json-inferTimestamps-disable-by-default-followup. Authored-by: Max Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… table ### What changes were proposed in this pull request? docs/sql-ref-syntax-qry-select-usedb.md -> docs/sql-ref-syntax-ddl-usedb.md docs/sql-ref-syntax-aux-refresh-table.md -> docs/sql-ref-syntax-aux-cache-refresh-table.md ### Why are the changes needed? usedb belongs to DDL. Its location should be consistent with other DDL commands file locations similar reason for refresh table ### Does this PR introduce _any_ user-facing change? before change, when clicking USE DATABASE, the side bar menu shows select commands <img width="1200" alt="Screen Shot 2020-07-04 at 9 05 35 AM" src="https://user-images.githubusercontent.com/13592258/86516696-b45f8a80-bdd7-11ea-8dba-3a5cca22aad3.png"> after change, when clicking USE DATABASE, the side bar menu shows DDL commands <img width="1120" alt="Screen Shot 2020-07-04 at 9 06 06 AM" src="https://user-images.githubusercontent.com/13592258/86516703-bf1a1f80-bdd7-11ea-8a90-ae7eaaafd44c.png"> before change, when clicking refresh table, the side bar menu shows Auxiliary statements <img width="1200" alt="Screen Shot 2020-07-04 at 9 30 40 AM" src="https://user-images.githubusercontent.com/13592258/86516877-3d2af600-bdd9-11ea-9568-0a6f156f57da.png"> after change, when clicking refresh table, the side bar menu shows Cache statements <img width="1199" alt="Screen Shot 2020-07-04 at 9 35 21 AM" src="https://user-images.githubusercontent.com/13592258/86516937-b4f92080-bdd9-11ea-8ad1-5f5a7f58d76b.png"> ### How was this patch tested? Manually build and check Closes #28995 from huaxingao/docs_fix. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Huaxin Gao <[email protected]>
…n mouseover in the WebUI ### What changes were proposed in this pull request? This PR changes `webui.css` to fix a style issue on moving mouse cursor on the Spark logo. ### Why are the changes needed? In the webui, the Spark logo is on the top right side. When we move mouse cursor on the logo, a weird underline appears near the logo. <img width="209" alt="logo_with_line" src="https://user-images.githubusercontent.com/4736016/86542828-3c6a9f00-bf54-11ea-9b9d-cc50c12c2c9b.png"> ### Does this PR introduce _any_ user-facing change? Yes. After this change applied, no more weird line shown even if mouse cursor moves on the logo. <img width="207" alt="removed-line-from-logo" src="https://user-images.githubusercontent.com/4736016/86542877-98cdbe80-bf54-11ea-8695-ee39689673ab.png"> ### How was this patch tested? By moving mouse cursor on the Spark logo and confirmed no more weird line there. Closes #29003 from sarutak/fix-logo-underline. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This PR aims to disable dependency tests(test-dependencies.sh) from Jenkins. ### Why are the changes needed? - First of all, GitHub Action provides the same test capability already and stabler. - Second, currently, `test-dependencies.sh` fails very frequently in AmpLab Jenkins environment. For example, in the following irrelevant PR, it fails 5 times during 6 hours. - #29001 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins without `test-dependencies.sh` invocation. Closes #29004 from dongjoon-hyun/SPARK-32178. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? This PR aims to reduce the required test resources in WorkerDecommissionExtendedSuite. ### Why are the changes needed? When Jenkins farms is crowded, the following failure happens currently [here](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2-hive-2.3/890/testReport/junit/org.apache.spark.scheduler/WorkerDecommissionExtendedSuite/Worker_decommission_and_executor_idle_timeout/) ``` java.util.concurrent.TimeoutException: Can't find 20 executors before 60000 milliseconds elapsed at org.apache.spark.TestUtils$.waitUntilExecutorsUp(TestUtils.scala:326) at org.apache.spark.scheduler.WorkerDecommissionExtendedSuite.$anonfun$new$2(WorkerDecommissionExtendedSuite.scala:45) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins. Closes #29001 from dongjoon-hyun/SPARK-32100-2. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…OperationStatus should include exception's stack trace to the error message ### What changes were proposed in this pull request? In https://issues.apache.org/jira/browse/SPARK-29283, we only show the error message of root cause to end-users through JDBC client. In some cases, it erases the straightaway messages that we intentionally make to help them for better understanding. The root cause is somehow obscure for JDBC end-users who only writing SQL queries. e.g ``` Error running query: org.apache.spark.sql.AnalysisException: The second argument of 'date_sub' function needs to be an integer.; ``` is better than just ``` Caused by: java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2 ``` We should do as Hive does in https://issues.apache.org/jira/browse/HIVE-14368 In general, this PR partially reverts SPARK-29283, ports HIVE-14368, and improves test coverage ### Why are the changes needed? 1. Do the same as Hive 2.3 and later for getting an error message in ThriftCLIService.GetOperationStatus 2. The root cause is somehow obscure for JDBC end-users who only writing SQL queries. 3. Consistency with `spark-sql` script ### Does this PR introduce _any_ user-facing change? Yes, when running queries using thrift server and an error occurs, you will get the full stack traces instead of only the message of the root cause ### How was this patch tested? add unit test Closes #28963 from yaooqinn/SPARK-32145. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…map test with window
### What changes were proposed in this pull request?
Improve the error message in test GroupedMapInPandasTests.test_grouped_over_window_with_key to show the incorrect values.
### Why are the changes needed?
This test failure has come up often in Arrow testing because it tests a struct with timestamp values through a Pandas UDF. The current error message is not helpful as it doesn't show the incorrect values, only that it failed. This change will instead raise an assertion error with the incorrect values on a failure.
Before:
```
======================================================================
FAIL: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/spark/python/pyspark/sql/tests/test_pandas_grouped_map.py", line 588, in test_grouped_over_window_with_key
self.assertTrue(all([r[0] for r in result]))
AssertionError: False is not true
```
After:
```
======================================================================
ERROR: test_grouped_over_window_with_key (pyspark.sql.tests.test_pandas_grouped_map.GroupedMapInPandasTests)
----------------------------------------------------------------------
...
AssertionError: {'start': datetime.datetime(2018, 3, 20, 0, 0), 'end': datetime.datetime(2018, 3, 25, 0, 0)}, != {'start': datetime.datetime(2020, 3, 20, 0, 0), 'end': datetime.datetime(2020, 3, 25, 0, 0)}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Improved existing test
Closes #28987 from BryanCutler/pandas-grouped-map-test-output-SPARK-32162.
Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? Use Files.createDirectory() to create local directory instead of File.mkdir() in DiskBlockManager. Many times, we will see such error log information like "Failed to create local dir in xxxxxx". But there is no clear information indicating why the directory creation failed. When Files.createDirectory() fails to create a local directory, it can give specific error information for subsequent troubleshooting(also throws IOException). ### Why are the changes needed? Throw clear error message when creating directory fails. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `DiskBlockManagerSuite` Closes #28997 from sidedoorleftroad/SPARK-32172. Authored-by: sidedoorleftroad <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…nullability together ### What changes were proposed in this pull request? Fix nullability of `GetArrayStructFields`. It should consider both the original array's `containsNull` and the inner field's nullability. ### Why are the changes needed? Fix a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes. See the added test. ### How was this patch tested? a new UT and end-to-end test Closes #28992 from cloud-fan/bug. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… without WindowExpression
### What changes were proposed in this pull request?
Add WindowFunction check at `CheckAnalysis`.
### Why are the changes needed?
Provide friendly error msg.
**BEFORE**
```scala
scala> sql("select rank() from values(1)").show
java.lang.UnsupportedOperationException: Cannot generate code for expression: rank()
```
**AFTER**
```scala
scala> sql("select rank() from values(1)").show
org.apache.spark.sql.AnalysisException: Window function rank() requires an OVER clause.;;
Project [rank() AS RANK()#3]
+- LocalRelation [col1#2]
```
### Does this PR introduce _any_ user-facing change?
Yes, user wiill be given a better error msg.
### How was this patch tested?
Pass the newly added UT.
Closes #28808 from ulysses-you/SPARK-31975.
Authored-by: ulysses <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Replace the combination of expressions `SecondsToTimestamp` and `UnixTimestamp` by `GetTimestamp` in `ParseToDate`. ### Why are the changes needed? Eliminate unnecessary parsing overhead in: **string -> timestamp -> long (seconds) -> timestamp -> date**. After the changes, the chain will be: **string -> timestamp -> date**. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing test suites such as `DateFunctionsSuite`. Closes #28999 from MaxGekk/ParseToDate-parse-timestamp. Authored-by: Max Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? Document the stage level scheduling feature. ### Why are the changes needed? Document the stage level scheduling feature. ### Does this PR introduce _any_ user-facing change? Documentation. ### How was this patch tested? n/a docs only Closes #29292 from tgravescs/SPARK-30322. Authored-by: Thomas Graves <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
### What changes were proposed in this pull request? This PR adds abstract classes for shuffle and broadcast, so that users can provide their columnar implementations. This PR updates several places to use the abstract exchange classes, and also update `AdaptiveSparkPlanExec` so that the columnar rules can see exchange nodes. This is an alternative of #29134 . Close #29134 ### Why are the changes needed? To allow columnar exchanges. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #29262 from cloud-fan/columnar. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
…consistent between modules ### What changes were proposed in this pull request? Upgrade codehaus maven build helper to allow people to specify a time during the build to avoid snapshot artifacts with different version strings. ### Why are the changes needed? During builds of snapshots the maven may assign different versions to different artifacts based on the time each individual sub-module starts building. The timestamp is used as part of the version string when run `maven deploy` on a snapshot build. This results in different sub-modules having different version strings. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual build while specifying the current time, ensured the time is consistent in the sub components. Open question: Ideally I'd like to backport this as well since it's sort of a bug fix and while it does change a dependency version it's not one that is propagated. I'd like to hear folks thoughts about this. Closes #29274 from holdenk/SPARK-32397-snapshot-artifact-timestamp-differences. Authored-by: Holden Karau <[email protected]> Signed-off-by: DB Tsai <[email protected]>
…Stages|OneApplication]Resource ### What changes were proposed in this pull request? This PR aims to remove `java.ws.rs.NotFoundException` from two problematic `import` statements. All the other use cases are correct. ### Why are the changes needed? In `StagesResource` and `OneApplicationResource`, there exist two `NotFoundException`s. - javax.ws.rs.NotFoundException - org.apache.spark.status.api.v1.NotFoundException To use `org.apache.spark.status.api.v1.NotFoundException` correctly, we should not import `java.ws.rs.NotFoundException`. This causes UT failures in Scala 2.13 environment. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Scala 2.12: Pass the GitHub Action or Jenkins. - Scala 2.13: Do the following manually. ``` $ dev/change-scala-version.sh 2.13 $ build/mvn test -pl core --am -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.deploy.history.HistoryServerSuite ``` **BEFORE** ``` *** 4 TESTS FAILED *** ``` **AFTER** ``` *** 1 TEST FAILED *** ``` Closes #29293 from dongjoon-hyun/SPARK-32487. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This PR aims to recover Java 11 build in `GitHub Action`. ### Why are the changes needed? This test coverage is removed before. Now, it's time to recover it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the GitHub Action. Closes #29295 from dongjoon-hyun/SPARK-32248. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…s and fixing a thread leak issue in pinned thread mode
### What changes were proposed in this pull request?
This PR proposes:
1. To introduce `InheritableThread` class, that works identically with `threading.Thread` but it can inherit the inheritable attributes of a JVM thread such as `InheritableThreadLocal`.
This was a problem from the pinned thread mode, see also #24898. Now it works as below:
```python
import pyspark
spark.sparkContext.setLocalProperty("a", "hi")
def print_prop():
print(spark.sparkContext.getLocalProperty("a"))
pyspark.InheritableThread(target=print_prop).start()
```
```
hi
```
2. Also, it adds the resource leak fix into `InheritableThread`. Py4J leaks the thread and does not close the connection from Python to JVM. In `InheritableThread`, it manually closes the connections when PVM garbage collection happens. So, JVM threads finish safely. I manually verified by profiling but there's also another easy way to verify:
```bash
PYSPARK_PIN_THREAD=true ./bin/pyspark
```
```python
>>> from threading import Thread
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
>>> Thread(target=lambda: spark.range(1000).collect()).start()
>>> spark._jvm._gateway_client.deque
deque([<py4j.clientserver.ClientServerConnection object at 0x119f7aba8>, <py4j.clientserver.ClientServerConnection object at 0x119fc9b70>, <py4j.clientserver.ClientServerConnection object at 0x119fc9e10>, <py4j.clientserver.ClientServerConnection object at 0x11a015358>, <py4j.clientserver.ClientServerConnection object at 0x119fc08d0>, <py4j.clientserver.ClientServerConnection object at 0x119fc00f0>])
```
This issue is fixed now.
3. Because now we have a fix for the issue here, it also proposes to deprecate `collectWithJobGroup` which was a temporary workaround added to avoid this leak issue.
### Why are the changes needed?
To support pinned thread mode properly without a resource leak, and a proper inheritable local properties.
### Does this PR introduce _any_ user-facing change?
Yes, it adds an API `InheritableThread` class for pinned thread mode.
### How was this patch tested?
Manually tested as described above, and unit test was added as well.
Closes #28968 from HyukjinKwon/SPARK-32010.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? for binary `LogisticRegressionModel`: 1, keep variables `_threshold` and `_rawThreshold` instead of computing them on each instance; 2, in `raw2probabilityInPlace`, make use of the characteristic that the sum of probability is 1.0; ### Why are the changes needed? for better performance ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuite and performace test in REPL Closes #29255 from zhengruifeng/pred_opt. Authored-by: zhengruifeng <[email protected]> Signed-off-by: Huaxin Gao <[email protected]>
…lt datasources ### What changes were proposed in this pull request? When `spark.sql.caseSensitive` is `false` (by default), check that there are not duplicate column names on the same level (top level or nested levels) in reading from in-built datasources Parquet, ORC, Avro and JSON. If such duplicate columns exist, throw the exception: ``` org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: ``` ### Why are the changes needed? To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error when `spark.sql.caseSensitive` is `false`: ```Scala org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase` ``` Checking of top-level duplicates was introduced by #17758. ### Does this PR introduce _any_ user-facing change? Yes. For the example from SPARK-32431: ORC: ```scala java.io.IOException: Error reading file: file:/private/var/folders/p3/dfs6mf655d7fnjrsjvldh0tc0000gn/T/spark-c02c2f9a-0cdc-4859-94fc-b9c809ca58b1/part-00001-63e8c3f0-7131-4ec9-be02-30b3fdd276f4-c000.snappy.orc at org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1329) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.ensureBatch(OrcMapreduceRecordReader.java:78) ... Caused by: java.io.EOFException: Read past end of RLE integer from compressed stream Stream for column 3 kind DATA position: 6 length: 6 range: 0 offset: 12 limit: 12 range 0 = 0 to 6 uncompressed: 3 to 3 at org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:61) at org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323) ``` JSON: ```scala +------------+ |StructColumn| +------------+ | [,,]| +------------+ ``` Parquet: ```scala +------------+ |StructColumn| +------------+ | [0,, 1]| +------------+ ``` Avro: ```scala +------------+ |StructColumn| +------------+ | [,,]| +------------+ ``` After the changes, Parquet, ORC, JSON and Avro output the same error: ```scala Found duplicate column(s) in the data schema: `camelcase`; org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:112) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:51) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:67) ``` ### How was this patch tested? Run modified test suites: ``` $ build/sbt "sql/test:testOnly org.apache.spark.sql.FileBasedDataSourceSuite" $ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.*" ``` and added new UT to `SchemaUtilsSuite`. Closes #29234 from MaxGekk/nested-case-insensitive-column. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…pply with Arrow vectorization
### What changes were proposed in this pull request?
This PR proposes to:
1. Fix the error message when the output schema is misbatched with R DataFrame from the given function. For example,
```R
df <- createDataFrame(list(list(a=1L, b="2")))
count(gapply(df, "a", function(key, group) { group }, structType("a int, b int")))
```
**Before:**
```
Error in handleErrors(returnStatus, conn) :
...
java.lang.UnsupportedOperationException
...
```
**After:**
```
Error in handleErrors(returnStatus, conn) :
...
java.lang.AssertionError: assertion failed: Invalid schema from gapply: expected IntegerType, IntegerType, got IntegerType, StringType
...
```
2. Update documentation about the schema matching for `gapply` and `dapply`.
### Why are the changes needed?
To show which schema is not matched, and let users know what's going on.
### Does this PR introduce _any_ user-facing change?
Yes, error message is updated as above, and documentation is updated.
### How was this patch tested?
Manually tested and unitttests were added.
Closes #29283 from HyukjinKwon/r-vectorized-error.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
…tions ### What changes were proposed in this pull request? Log error/warn message only once at the server-side for both sync and async modes ### Why are the changes needed? In b151194 we make the error logging for SparkExecuteStatementOperation with `runInBackground=true` not duplicated, but the operations with runInBackground=false and other metadata operation still will be log twice which happened in the operation's `runInternal` method and ThriftCLIService. In this PR, I propose to reflect the logic to get a unified error handling approach. ### Does this PR introduce _any_ user-facing change? Yes, when spark.sql.hive.thriftServer.async=false and people call sync APIs the error message will be logged only once at server-side. ### How was this patch tested? locally verified the result in target/unit-test.log add unit tests. Closes #29204 from yaooqinn/SPARK-32412. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…generating unused code ### What changes were proposed in this pull request? This PR aims to update `SqlBse.g4` for avoiding generating unused code. Currently, ANTLR generates unused methods and variables; `isValidDecimal` and `isHint` are only used in the generated lexer. This PR changed the code to use `parser::members` and `lexer::members` to avoid it. ### Why are the changes needed? To reduce unnecessary code. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29296 from maropu/UpdateSqlBase. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…ng script ### What changes were proposed in this pull request? This PR proposes to skip SparkR installation that is to run R linters (see SPARK-8505) in the test-only mode at `dev/run-tests.py` script. As of SPARK-32292, the test-only mode in `dev/run-tests.py` was introduced, for example: ``` dev/run-tests.py --modules sql,core ``` which only runs the relevant tests and does not run other tests such as linters. Therefore, we don't need to install SparkR when `--modules` are specified. ### Why are the changes needed? GitHub Actions build is currently failed as below: ``` ERROR: this R is version 3.4.4, package 'SparkR' requires R >= 3.5 [error] running /home/runner/work/spark/spark/R/install-dev.sh ; received return code 1 ##[error]Process completed with exit code 10. ``` For some reasons, looks GitHub Actions started to have R 3.4.4 installed by default; however, R 3.4 was dropped as of SPARK-32073. When SparkR tests are not needed, GitHub Actions still builds SparkR with a low R version and it causes the test failure. This PR partially fixes it by avoid the installation of SparkR. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? GitHub Actions tests should run to confirm this fix is correct. Closes #29300 from HyukjinKwon/install-r. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…itHub Actions ### What changes were proposed in this pull request? This PR proposes to manually install R instead of using `setup-r` which seems broken. Currently, GitHub Actions uses its default R 3.4.4 installed, which we dropped as of SPARK-32073. While I am here, I am also upgrading R version to 4.0. Jenkins will test the old version and GitHub Actions tests the new version. AppVeyor uses R 4.0 but it does not check CRAN which is important when we make a release. ### Why are the changes needed? To recover GitHub Actions build. ### Does this PR introduce _any_ user-facing change? No, dev-only ### How was this patch tested? Manually tested at HyukjinKwon#15 Closes #29302 from HyukjinKwon/SPARK-32493. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…ting ### What changes were proposed in this pull request? #26556 excluded `.github/workflows/master.yml`. So tests are skipped if the GitHub Actions configuration file is changed. As of SPARK-32245, we now run the regular tests via the testing script. We should include it to test to make sure GitHub Actions build does not break due to some changes such as Python versions. ### Why are the changes needed? For better test coverage in GitHub Actions build. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? GitHub Actions in this PR will test. Closes #29305 from HyukjinKwon/SPARK-32496. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? Fix regression bug in load-spark-env.cmd with Spark 3.0.0 ### Why are the changes needed? cmd doesn't support set env twice. So set `SPARK_ENV_CMD=%SPARK_CONF_DIR%\%SPARK_ENV_CMD%` doesn't take effect, which caused regression. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested. 1. Create a spark-env.cmd under conf folder. Inside this, `echo spark-env.cmd` 2. Run old load-spark-env.cmd, nothing printed in the output 2. Run fixed load-spark-env.cmd, `spark-env.cmd` showed in the output. Closes #29044 from warrenzhu25/32227. Lead-authored-by: Warren Zhu <[email protected]> Co-authored-by: Warren Zhu <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…ctions ### What changes were proposed in this pull request? CRAN check fails due to the size of the generated PDF docs as below: ``` ... WARNING ‘qpdf’ is needed for checks on size reduction of PDFs ... Status: 1 WARNING, 1 NOTE See ‘/home/runner/work/spark/spark/R/SparkR.Rcheck/00check.log’ for details. ``` This PR proposes to install `qpdf` in GitHub Actions. Note that I cannot reproduce in my local with the same R version so I am not documenting it for now. Also, while I am here, I piggyback to install SparkR when the module includes `sparkr`. it is rather a followup of SPARK-32491. ### Why are the changes needed? To fix SparkR CRAN check failure. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? GitHub Actions will test it out. Closes #29306 from HyukjinKwon/SPARK-32497. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? So far, we fixed many stuffs in `core` module. This PR fixes the remaining UT failures in Scala 2.13. - `OneApplicationResource.environmentInfo` will return a deterministic result for `sparkProperties`, `hadoopProperties`, `systemProperties`, and `classpathEntries`. - `SubmitRestProtocolSuite` has Scala 2.13 answer in addition to the existing Scala 2.12 answer, and uses the expected answer based on the Scala runtime version. ### Why are the changes needed? To support Scala 2.13. ### Does this PR introduce _any_ user-facing change? Yes, `environmentInfo` is changed, but this fixes the indeterministic behavior. ### How was this patch tested? - Scala 2.12: Pass the Jenkins or GitHub Action - Scala 2.13: Do the following. ``` $ dev/change-scala-version.sh 2.13 $ build/mvn test -pl core --am -Pscala-2.13 ``` **BEFORE** ``` Tests: succeeded 2612, failed 3, canceled 1, ignored 8, pending 0 *** 3 TESTS FAILED *** ``` **AFTER** ``` Tests: succeeded 2615, failed 0, canceled 1, ignored 8, pending 0 All tests passed. ``` Closes #29298 from dongjoon-hyun/SPARK-32489. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This PR reduces the prospect of a job loss during decommissioning. It fixes two holes in the current decommissioning framework: - (a) Loss of decommissioned executors is not treated as a job failure: We know that the decommissioned executor would be dying soon, so its death is clearly not caused by the application. - (b) Shuffle files on the decommissioned host are cleared when the first fetch failure is detected from a decommissioned host: This is a bit tricky in terms of when to clear the shuffle state ? Ideally you want to clear it the millisecond before the shuffle service on the node dies (or the executor dies when there is no external shuffle service) -- too soon and it could lead to some wastage and too late would lead to fetch failures. The approach here is to do this clearing when the very first fetch failure is observed on the decommissioned block manager, without waiting for other blocks to also signal a failure. ### Why are the changes needed? Without them decommissioning a lot of executors at a time leads to job failures. ### Code overview The task scheduler tracks the executors that were decommissioned along with their `ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-) ### Questions for reviewers - Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-). Closes #29014 from agrawaldevesh/decom_harden. Authored-by: Devesh Agrawal <[email protected]> Signed-off-by: Holden Karau <[email protected]>
### What changes were proposed in this pull request? This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite. ### Description of the problem Make the block manager decommissioning test be less flaky An interesting failure happens when migrateDuring = true (and persist or shuffle is true): - We schedule the job with tasks on executors 0, 1, 2. - We wait 300 ms and decommission executor 0. - If the task is not yet done on executor 0, it will now fail because the block manager won't be able to save the block. This condition is easy to trigger on a loaded machine where the github checks run. - The task with retry on a different executor (1 or 2) and its shuffle blocks will land there. - No actual block migration happens here because the decommissioned executor technically failed before it could even produce a block. To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned. The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor. ### Why are the changes needed? I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it. ### Does this PR introduce _any_ user-facing change? No, unit test only change. ### How was this patch tested? Github checks. Ran this test 100 times, 10 at a time in parallel in a script. Closes #29226 from agrawaldevesh/block-manager-decom-flaky. Authored-by: Devesh Agrawal <[email protected]> Signed-off-by: Holden Karau <[email protected]>
### What changes were proposed in this pull request? This PR removes a test added in SPARK-32175(#29002). ### Why are the changes needed? That test is flaky. It can be mitigated by increasing the timeout but it would rather be simpler to remove the test. See also the [discussion](#29002 (comment)). ### Does this PR introduce _any_ user-facing change? No. Closes #29314 from sarutak/remove-flaky-test. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
… avoid infinite wait in tests ### What changes were proposed in this pull request? Structured Streaming Kafka connector tests are now using a deprecated `poll(long)` API which could cause infinite wait. In this PR I've eliminated these calls and replaced them with `AdminClient`. ### Why are the changes needed? Deprecated `poll(long)` API calls. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #29289 from gaborgsomogyi/SPARK-32482. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
### What changes were proposed in this pull request? Adding codegen for shuffled hash join. Shuffled hash join codegen is very similar to broadcast hash join codegen. So most of code change is to refactor existing codegen in `BroadcastHashJoinExec` to `HashJoin`. Example codegen for query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153): ``` def shuffleHashJoin(): Unit = { val N: Long = 4 << 20 withSQLConf( SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { codegenBenchmark("shuffle hash join", N) { val df1 = spark.range(N).selectExpr(s"id as k1") val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2") val df = df1.join(df2, col("k1") === col("k2")) assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined) df.noop() } } } ``` Shuffled hash join codegen: ``` == Subtree 3 / 3 (maxMethodCodeSize:113; maxConstantPoolSize:126(0.19% used); numInnerClasses:0) == *(3) ShuffledHashJoin [k1#2L], [k2#6L], Inner, BuildRight :- *(1) Project [id#0L AS k1#2L] : +- *(1) Range (0, 4194304, step=1, splits=1) +- *(2) Project [(id#4L * 3) AS k2#6L] +- *(2) Range (0, 1398101, step=1, splits=1) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage3(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=3 /* 006 */ final class GeneratedIteratorForCodegenStage3 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator inputadapter_input_0; /* 010 */ private org.apache.spark.sql.execution.joins.HashedRelation shj_relation_0; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] shj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 012 */ /* 013 */ public GeneratedIteratorForCodegenStage3(Object[] references) { /* 014 */ this.references = references; /* 015 */ } /* 016 */ /* 017 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 018 */ partitionIndex = index; /* 019 */ this.inputs = inputs; /* 020 */ inputadapter_input_0 = inputs[0]; /* 021 */ shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]); /* 022 */ shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 023 */ /* 024 */ } /* 025 */ /* 026 */ private void shj_doConsume_0(InternalRow inputadapter_row_0, long shj_expr_0_0) throws java.io.IOException { /* 027 */ // generate join key for stream side /* 028 */ /* 029 */ // find matches from HashRelation /* 030 */ scala.collection.Iterator shj_matches_0 = false ? /* 031 */ null : (scala.collection.Iterator)shj_relation_0.get(shj_expr_0_0); /* 032 */ if (shj_matches_0 != null) { /* 033 */ while (shj_matches_0.hasNext()) { /* 034 */ UnsafeRow shj_matched_0 = (UnsafeRow) shj_matches_0.next(); /* 035 */ { /* 036 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1); /* 037 */ /* 038 */ long shj_value_1 = shj_matched_0.getLong(0); /* 039 */ shj_mutableStateArray_0[0].reset(); /* 040 */ /* 041 */ shj_mutableStateArray_0[0].write(0, shj_expr_0_0); /* 042 */ /* 043 */ shj_mutableStateArray_0[0].write(1, shj_value_1); /* 044 */ append((shj_mutableStateArray_0[0].getRow()).copy()); /* 045 */ /* 046 */ } /* 047 */ } /* 048 */ } /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ while ( inputadapter_input_0.hasNext()) { /* 054 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next(); /* 055 */ /* 056 */ long inputadapter_value_0 = inputadapter_row_0.getLong(0); /* 057 */ /* 058 */ shj_doConsume_0(inputadapter_row_0, inputadapter_value_0); /* 059 */ if (shouldStop()) return; /* 060 */ } /* 061 */ } /* 062 */ /* 063 */ } ``` Broadcast hash join codegen for the same query (for reference here): ``` == Subtree 2 / 2 (maxMethodCodeSize:280; maxConstantPoolSize:218(0.33% used); numInnerClasses:0) == *(2) BroadcastHashJoin [k1#2L], [k2#6L], Inner, BuildRight, false :- *(2) Project [id#0L AS k1#2L] : +- *(2) Range (0, 4194304, step=1, splits=1) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#22] +- *(1) Project [(id#4L * 3) AS k2#6L] +- *(1) Range (0, 1398101, step=1, splits=1) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean range_initRange_0; /* 010 */ private long range_nextIndex_0; /* 011 */ private TaskContext range_taskContext_0; /* 012 */ private InputMetrics range_inputMetrics_0; /* 013 */ private long range_batchEnd_0; /* 014 */ private long range_numElementsTodo_0; /* 015 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ /* 026 */ range_taskContext_0 = TaskContext.get(); /* 027 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 028 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ /* 032 */ bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy(); /* 033 */ incPeakExecutionMemory(bhj_relation_0.estimatedSize()); /* 034 */ /* 035 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 036 */ /* 037 */ } /* 038 */ /* 039 */ private void initRange(int idx) { /* 040 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 041 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L); /* 042 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(4194304L); /* 043 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 044 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 045 */ long partitionEnd; /* 046 */ /* 047 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 048 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 049 */ range_nextIndex_0 = Long.MAX_VALUE; /* 050 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 051 */ range_nextIndex_0 = Long.MIN_VALUE; /* 052 */ } else { /* 053 */ range_nextIndex_0 = st.longValue(); /* 054 */ } /* 055 */ range_batchEnd_0 = range_nextIndex_0; /* 056 */ /* 057 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 058 */ .multiply(step).add(start); /* 059 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 060 */ partitionEnd = Long.MAX_VALUE; /* 061 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 062 */ partitionEnd = Long.MIN_VALUE; /* 063 */ } else { /* 064 */ partitionEnd = end.longValue(); /* 065 */ } /* 066 */ /* 067 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 068 */ java.math.BigInteger.valueOf(range_nextIndex_0)); /* 069 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue(); /* 070 */ if (range_numElementsTodo_0 < 0) { /* 071 */ range_numElementsTodo_0 = 0; /* 072 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 073 */ range_numElementsTodo_0++; /* 074 */ } /* 075 */ } /* 076 */ /* 077 */ private void bhj_doConsume_0(long bhj_expr_0_0) throws java.io.IOException { /* 078 */ // generate join key for stream side /* 079 */ /* 080 */ // find matches from HashedRelation /* 081 */ UnsafeRow bhj_matched_0 = false ? null: (UnsafeRow)bhj_relation_0.getValue(bhj_expr_0_0); /* 082 */ if (bhj_matched_0 != null) { /* 083 */ { /* 084 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 085 */ /* 086 */ long bhj_value_2 = bhj_matched_0.getLong(0); /* 087 */ range_mutableStateArray_0[3].reset(); /* 088 */ /* 089 */ range_mutableStateArray_0[3].write(0, bhj_expr_0_0); /* 090 */ /* 091 */ range_mutableStateArray_0[3].write(1, bhj_value_2); /* 092 */ append((range_mutableStateArray_0[3].getRow())); /* 093 */ /* 094 */ } /* 095 */ } /* 096 */ /* 097 */ } /* 098 */ /* 099 */ protected void processNext() throws java.io.IOException { /* 100 */ // initialize Range /* 101 */ if (!range_initRange_0) { /* 102 */ range_initRange_0 = true; /* 103 */ initRange(partitionIndex); /* 104 */ } /* 105 */ /* 106 */ while (true) { /* 107 */ if (range_nextIndex_0 == range_batchEnd_0) { /* 108 */ long range_nextBatchTodo_0; /* 109 */ if (range_numElementsTodo_0 > 1000L) { /* 110 */ range_nextBatchTodo_0 = 1000L; /* 111 */ range_numElementsTodo_0 -= 1000L; /* 112 */ } else { /* 113 */ range_nextBatchTodo_0 = range_numElementsTodo_0; /* 114 */ range_numElementsTodo_0 = 0; /* 115 */ if (range_nextBatchTodo_0 == 0) break; /* 116 */ } /* 117 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L; /* 118 */ } /* 119 */ /* 120 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L); /* 121 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) { /* 122 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0; /* 123 */ /* 124 */ bhj_doConsume_0(range_value_0); /* 125 */ /* 126 */ if (shouldStop()) { /* 127 */ range_nextIndex_0 = range_value_0 + 1L; /* 128 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1); /* 129 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1); /* 130 */ return; /* 131 */ } /* 132 */ /* 133 */ } /* 134 */ range_nextIndex_0 = range_batchEnd_0; /* 135 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0); /* 136 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0); /* 137 */ range_taskContext_0.killTaskIfInterrupted(); /* 138 */ } /* 139 */ } /* 140 */ /* 141 */ } ``` ### Why are the changes needed? Codegen shuffled hash join can help save CPU cost. We added shuffled hash join codegen internally in our fork, and seeing obvious improvement in benchmark compared to current non-codegen code path. Test example query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153), seeing 30% wall clock time improvement compared to existing non-codegen code path: Enable shuffled hash join code-gen: ``` Running benchmark: shuffle hash join Running case: shuffle hash join wholestage off Stopped after 2 iterations, 1358 ms Running case: shuffle hash join wholestage on Stopped after 5 iterations, 2323 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz shuffle hash join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ shuffle hash join wholestage off 649 679 43 6.5 154.7 1.0X shuffle hash join wholestage on 436 465 45 9.6 103.9 1.5X ``` Disable shuffled hash join codegen: ``` Running benchmark: shuffle hash join Running case: shuffle hash join wholestage off Stopped after 2 iterations, 1345 ms Running case: shuffle hash join wholestage on Stopped after 5 iterations, 2967 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz shuffle hash join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ shuffle hash join wholestage off 646 673 37 6.5 154.1 1.0X shuffle hash join wholestage on 549 594 47 7.6 130.9 1.2X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `WholeStageCodegenSuite`. Closes #29277 from c21/codegen. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…tests ### What changes were proposed in this pull request? While I'm implementing SPARK-32032 I've found a bug in Kafka: https://issues.apache.org/jira/browse/KAFKA-10318. This will cause issues only later when it's fixed but it would be good to fix it now because SPARK-32032 would like to bring in `AdminClient` where the code blows up with the mentioned `ConfigException`. This would reduce the code changes in the mentioned jira. In this PR I've changed `default.api.timeout.ms` to `request.timeout.ms` which fulfils this condition. ### Why are the changes needed? Solve later problems and reduce SPARK-32032 PR size. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #29272 from gaborgsomogyi/SPARK-32468. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
… create SparkContext in executors ### What changes were proposed in this pull request? This is a follow-up of #28986. This PR adds a config to switch allow/disallow to create `SparkContext` in executors. - `spark.driver.allowSparkContextInExecutors` ### Why are the changes needed? Some users or libraries actually create `SparkContext` in executors. We shouldn't break their workloads. ### Does this PR introduce _any_ user-facing change? Yes, users will be able to create `SparkContext` in executors with the config enabled. ### How was this patch tested? More tests are added. Closes #29278 from ueshin/issues/SPARK-32160/add_configs. Authored-by: Takuya UESHIN <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…configs ### What changes were proposed in this pull request? This followup addresses comments from #29202 (comment) 1. make RESET static SQL configs/spark core configs fail as same as the SET command. Not that, for core ones, they have to be pre-registered, otherwise, they are still able to be SET/RESET 2. add test cases for configurations w/ optional default values ### Why are the changes needed? behavior change with suggestions from PMCs ### Does this PR introduce _any_ user-facing change? Yes, RESET will fail after this PR, before it just does nothing because the static ones are static. ### How was this patch tested? add more tests. Closes #29297 from yaooqinn/SPARK-32406-F. Authored-by: Kent Yao <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
… id instead of the object name ### What changes were proposed in this pull request? Just few log lines fixes which are logging the object name instead of the stage IDs ### Why are the changes needed? This would make it easier later for debugging. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Just log messages. Existing tests should be enough Closes #29279 from venkata91/SPARK-31418-follow-up. Authored-by: Venkata krishnan Sowrirajan <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
### What changes were proposed in this pull request? Address comment in #28707 (comment) ### Why are the changes needed? Hide the implementation details in the config doc. ### Does this PR introduce _any_ user-facing change? Config doc change. ### How was this patch tested? Document only. Closes #29315 from xuanyuanking/SPARK-31894-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This PR updates the AQE framework to at least return one partition during coalescing. This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`. ### Why are the changes needed? It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions. This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future. According to #28916, this change also fix some perf problems. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated test. Closes #29307 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
fqaiser94
pushed a commit
that referenced
this pull request
Jul 31, 2020
…chmarks ### What changes were proposed in this pull request? Replace `CAST(... AS TIMESTAMP` by `TIMESTAMP_SECONDS` in the following benchmarks: - ExtractBenchmark - DateTimeBenchmark - FilterPushdownBenchmark - InExpressionBenchmark ### Why are the changes needed? The benchmarks fail w/o the changes: ``` [info] Running benchmark: datetime +/- interval [info] Running case: date + interval(m) [error] Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`id` AS TIMESTAMP)' due to data type mismatch: cannot cast bigint to timestamp,you can enable the casting by setting spark.sql.legacy.allowCastNumericToTimestamp to true,but we strongly recommend using function TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS instead.; line 1 pos 5; [error] 'Project [(cast(cast(id#0L as timestamp) as date) + 1 months) AS (CAST(CAST(id AS TIMESTAMP) AS DATE) + INTERVAL '1 months')#2] [error] +- Range (0, 10000000, step=1, splits=Some(1)) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running the affected benchmarks. Closes apache#28843 from MaxGekk/GuoPhilipse-31710-fix-compatibility-followup. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.