forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Update to lastest version #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## What changes were proposed in this pull request? Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF. ## How was this patch tested? Author: Li Jin <[email protected]> Closes #21887 from icexelloss/SPARK-23633-sql-programming-guide.
…ex types ## What changes were proposed in this pull request? When the pivot column is of a complex type, the eval() result will be an UnsafeRow, while the keys of the HashMap for column value matching is a GenericInternalRow. As a result, there will be no match and the result will always be empty. So for a pivot column of complex-types, we should: 1) If the complex-type is not comparable (orderable), throw an Exception. It cannot be a pivot column. 2) Otherwise, if it goes through the `PivotFirst` code path, `PivotFirst` should use a TreeMap instead of HashMap for such columns. This PR has also reverted the walk-around in Analyzer that had been introduced to avoid this `PivotFirst` issue. ## How was this patch tested? Added UT. Author: maryannxue <[email protected]> Closes #21926 from maryannxue/pivot_followup.
## What changes were proposed in this pull request? It proposes a version in which nullable expressions are not valid in the limit clause ## How was this patch tested? It was tested with unit and e2e tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Mauro Palsgraaf <[email protected]> Closes #21807 from mauropalsgraaf/SPARK-24536.
…tClassifier.featureSubsetStrategy well ## What changes were proposed in this pull request? update doc of RandomForestClassifier.featureSubsetStrategy ## How was this patch tested? local built doc rdoc:  pydoc:  Author: zhengruifeng <[email protected]> Closes #21788 from zhengruifeng/rf_doc_py_r.
## What changes were proposed in this pull request? This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated. ## How was this patch tested? This PR uses existing Kafka related unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: tedyu <[email protected]> Closes #21488 from tedyu/master.
## What changes were proposed in this pull request? Add numIter to Python version of ClusteringSummary ## How was this patch tested? Modified existing UT test_multiclass_logistic_regression_summary Author: Huaxin Gao <[email protected]> Closes #21925 from huaxingao/spark-24973.
…c to PyArrow 0.9.0) ## What changes were proposed in this pull request? See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using `from_pandas` to convert decimals fails if encounters a value of `None`: ```python import pyarrow as pa import pandas as pd from decimal import Decimal pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 2)) ``` **Arrow 0.8.0** ``` <pyarrow.lib.Decimal128Array object at 0x10a572c58> [ Decimal('3.14'), NA ] ``` **Arrow 0.9.0** ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` This PR propose to work around this via Decimal NaN: ```python pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), type=pa.decimal128(3, 2)) ``` ``` <pyarrow.lib.Decimal128Array object at 0x10ffd2e68> [ Decimal('3.14'), NA ] ``` ## How was this patch tested? Manually tested: ```bash SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests ``` **Before** ``` Traceback (most recent call last): File "/.../spark/python/pyspark/sql/tests.py", line 4672, in test_vectorized_udf_null_decimal self.assertEquals(df.collect(), res.collect()) File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect sock_info = self._jdf.collectToPython() File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o51.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/pyspark/worker.py", line 320, in main process() File "/.../spark/python/pyspark/worker.py", line 315, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream batch = _create_batch(series, self._timezone) File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch arrs = [create_array(s, t) for s, t in series] File "/.../spark/python/pyspark/serializers.py", line 241, in create_array return pa.Array.from_pandas(s, mask=mask, type=t) File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` **After** ``` Running tests... ---------------------------------------------------------------------- Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). .......S............................. ---------------------------------------------------------------------- Ran 37 tests in 21.980s ``` Author: hyukjinkwon <[email protected]> Closes #21928 from HyukjinKwon/SPARK-24976.
…semantic equivalence ## What changes were proposed in this pull request? Similar to SPARK-24890, if all the outputs of `CaseWhen` are semantic equivalence, `CaseWhen` can be removed. ## How was this patch tested? Tests added. Author: DB Tsai <[email protected]> Closes #21852 from dbtsai/short-circuit-when.
## What changes were proposed in this pull request? Previously TVF resolution could throw IllegalArgumentException if the data type is null type. This patch replaces that exception with AnalysisException, enriched with positional information, to improve error message reporting and to be more consistent with rest of Spark SQL. ## How was this patch tested? Updated the test case in table-valued-functions.sql.out, which is how I identified this problem in the first place. Author: Reynold Xin <[email protected]> Closes #21934 from rxin/SPARK-24951.
## What changes were proposed in this pull request? When user calls anUDAF with the wrong number of arguments, Spark previously throws an AssertionError, which is not supposed to be a user-facing exception. This patch updates it to throw AnalysisException instead, so it is consistent with a regular UDF. ## How was this patch tested? Updated test case udaf.sql. Author: Reynold Xin <[email protected]> Closes #21938 from rxin/SPARK-24982.
…stener. There is a narrow race in this code that is caused when the code being run in assertSpilled / assertNotSpilled runs more than a single job. SpillListener assumed that only a single job was run, and so would only block waiting for that single job to finish when `numSpilledStages` was called. But some tests (like SQL tests that call `checkAnswer`) run more than one job, and so that wait was basically a no-op. This could cause the next test to install a listener to receive events from the previous job. Which could cause test failures in certain cases. The change fixes that race, and also uninstalls listeners after the test runs, so they don't accumulate when the SparkContext is shared among multiple tests. Author: Marcelo Vanzin <[email protected]> Closes #21639 from vanzin/SPARK-24653.
## What changes were proposed in this pull request? This is a follow up of #21118 . In #21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #21921 from cloud-fan/row.
## What changes were proposed in this pull request? The PR adds the SQL function `array_except`. The behavior of the function is based on Presto's one. This function returns returns an array of the elements in array1 but not in array2. Note: The order of elements in the result is not defined. ## How was this patch tested? Added UTs. Author: Kazuaki Ishizaki <[email protected]> Closes #21103 from kiszk/SPARK-23915.
https://issues.apache.org/jira/browse/SPARK-24960 ## What changes were proposed in this pull request? Expose ports explicitly in the driver container. The driver Service created expects to reach the driver Pod at specific ports which before this change, were not explicitly exposed and would likely cause connection issues (see apache-spark-on-k8s#617). This is a port of the original PR created in the now-deprecated Kubernetes fork: apache-spark-on-k8s#618 ## How was this patch tested? Failure in apache-spark-on-k8s#617 reproduced on Kubernetes 1.6.x and 1.8.x. Built the driver image with this patch and observed fixed apache-spark-on-k8s#617 on Kubernetes 1.6.x. Author: Adelbert Chang <[email protected]> Closes #21884 from adelbertc/k8s-expose-driver-ports.
…c partitions
## What changes were proposed in this pull request?
How to reproduce:
```sql
spark-sql> CREATE TABLE tbl AS SELECT 1;
spark-sql> CREATE TABLE tbl1 (c1 BIGINT, day STRING, hour STRING)
> USING parquet
> PARTITIONED BY (day, hour);
spark-sql> INSERT INTO TABLE tbl1 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0;
spark-sql> SHOW PARTITIONS tbl1;
spark-sql> CREATE TABLE tbl2 (c1 BIGINT)
> PARTITIONED BY (day STRING, hour STRING);
spark-sql> INSERT INTO TABLE tbl2 PARTITION (day = '2018-07-25', hour='01') SELECT * FROM tbl where 1=0;
spark-sql> SHOW PARTITIONS tbl2;
day=2018-07-25/hour=01
spark-sql>
```
1. Users will be confused about whether the partition data of `tbl1` is generated.
2. Inconsistent with Hive table behavior.
This pr fix this issues.
## How was this patch tested?
unit tests
Author: Yuming Wang <[email protected]>
Closes #21883 from wangyum/SPARK-24937.
## What changes were proposed in this pull request? Regarding user-specified schema, data sources may have 3 different behaviors: 1. must have a user-specified schema 2. can't have a user-specified schema 3. can accept the user-specified if it's given, or infer the schema. I added `ReadSupportWithSchema` to support these behaviors, following data source v1. But it turns out we don't need this extra interface. We can just add a `createReader(schema, options)` to `ReadSupport` and make it call `createReader(options)` by default. TODO: also fix the streaming API in followup PRs. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #21946 from cloud-fan/ds-schema.
…guration ## What changes were proposed in this pull request? This pr add `spark.broadcast.checksum` to configuration. ## How was this patch tested? manually tested Author: liuxian <[email protected]> Closes #21825 from 10110346/checksum_config.
## What changes were proposed in this pull request? In response to a recent question, this reiterates that network access to a Spark cluster should be disabled by default, and that access to its hosts and services from outside a private network should be added back explicitly. Also, some minor touch-ups while I was at it. ## How was this patch tested? N/A Author: Sean Owen <[email protected]> Closes #21947 from srowen/SecurityNote.
## What changes were proposed in this pull request? This PR is to refactor the code in AVERAGE by dsl. ## How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #21951 from gatorsmile/refactor1.
## What changes were proposed in this pull request? ClusteringEvaluator support array input ## How was this patch tested? added tests Author: zhengruifeng <[email protected]> Closes #21563 from zhengruifeng/clu_eval_support_array.
… barrier stage fail ## What changes were proposed in this pull request? Kill all running tasks when a task in a barrier stage fail in the middle. `TaskScheduler`.`cancelTasks()` will also fail the job, so we implemented a new method `killAllTaskAttempts()` to just kill all running tasks of a stage without cancel the stage/job. ## How was this patch tested? Add new test cases in `TaskSchedulerImplSuite`. Author: Xingbo Jiang <[email protected]> Closes #21943 from jiangxb1987/killAllTasks.
## What changes were proposed in this pull request? This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk). * We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2). * We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3). Note: regarding issue 1 in the doc the plan is: > Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed. ## How was this patch tested? This was manually tested: ```./dev/change-scala-version.sh 2.12 ./build/mvn -DskipTests -Pscala-2.12 clean package ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None``` Author: Stavros Kontopoulos <[email protected]> Closes #21930 from skonto/scala2.12-sup.
## What changes were proposed in this pull request? Remove the AnalysisBarrier LogicalPlan node, which is useless now. ## How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #21962 from gatorsmile/refactor2.
## What changes were proposed in this pull request? This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull" ## How was this patch tested? A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala Author: Kaya Kupferschmidt <[email protected]> Closes #21722 from kupferk/SPARK-24742.
## What changes were proposed in this pull request? This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull" ## How was this patch tested? A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala Author: Kaya Kupferschmidt <[email protected]> Closes #21722 from kupferk/SPARK-24742.
## What changes were proposed in this pull request? This addresses https://github.com/apache/spark/pull/21236/files#r207078480 both #21236 and #21838 add a InternalRow result check to ExpressionEvalHelper and becomes duplicated. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #21958 from cloud-fan/minor.
…hmetic operations cause overflow ## What changes were proposed in this pull request? According to the discussion in #21599, changing the behavior of arithmetic operations so that they can check for overflow is not nice in a minor release. What we can do for 2.4 is warn users about the current behavior in the documentation, so that they are aware of the issue and can take proper actions. ## How was this patch tested? NA Author: Marco Gaido <[email protected]> Closes #21967 from mgaido91/SPARK-24598_doc.
…s a barrier stage with unsupported RDD chain pattern ## What changes were proposed in this pull request? Check on job submit to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The following patterns are not supported: - Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD); - An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). ## How was this patch tested? Add test cases in `BarrierStageOnSubmittedSuite`. Author: Xingbo Jiang <[email protected]> Closes #21927 from jiangxb1987/SPARK-24820.
## What changes were proposed in this pull request? This pr fixes lint-python. ``` ./python/pyspark/accumulators.py:231:9: E306 expected 1 blank line before a nested definition, found 0 ./python/pyspark/accumulators.py:257:101: E501 line too long (107 > 100 characters) ./python/pyspark/accumulators.py:264:1: E302 expected 2 blank lines, found 1 ./python/pyspark/accumulators.py:281:1: E302 expected 2 blank lines, found 1 ``` ## How was this patch tested? Executed lint-python manually. Author: Takuya UESHIN <[email protected]> Closes #21973 from ueshin/issues/build/1/fix_lint-python.
…register itself from rm ## What changes were proposed in this pull request? When using older versions of spark releases, a use case generated a huge code-gen file which hit the limitation `Constant pool has grown past JVM limit of 0xFFFF`. In this situation, it should fail immediately. But the diagnosis message sent to RM is too large, the ApplicationMaster suspended and RM's ZKStateStore was crashed. For 2.3 or later spark releases the limitation of code-gen has been removed, but maybe there are still some uncaught exceptions that contain oversized error message will cause such a problem. This PR is aim to cut down the diagnosis message size. ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22180 from yaooqinn/SPARK-25174. Authored-by: Kent Yao <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
## What changes were proposed in this pull request? `parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow. ## How was this patch tested? unit test Closes #22225 from mengxr/SPARK-25234. Authored-by: Xiangrui Meng <[email protected]> Signed-off-by: Xiangrui Meng <[email protected]>
## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22189 from movrsprbp/patch-1. Authored-by: jaroslav chládek <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
## What changes were proposed in this pull request? [SPARK-25095](ad45299) introduced `ambiguous reference to overloaded definition` ``` [error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition, [error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext [error] and method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext [error] match argument types (org.apache.spark.TaskContext => Unit) [error] context.addTaskCompletionListener(_ => server.close()) [error] ^ [error] one error found [error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s] ``` which fails the Scala 2.12 branch build. ## How was this patch tested? Existing tests Closes #22229 from dbtsai/fix-2.12-build. Authored-by: DB Tsai <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
## What changes were proposed in this pull request? An RDD is created using LabeledPoint, but the comment is like #LabeledPoint(feature, label). Although in the method ChiSquareTest.test, the second parameter is feature and the third parameter is label, it it better to write label in front of feature here because if an RDD is created using LabeldPoint, what we get are actually (label, feature) pairs. Now it is changed as LabeledPoint(label, feature). The comments in Scala and Java example have the same typos. ## How was this patch tested? tested https://issues.apache.org/jira/browse/SPARK-24688 Author: Weizhe Huang 492816239qq.com Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #21665 from uzmijnlm/my_change. Authored-by: Huangweizhe <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…turn duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query. ## How was this patch tested? Jenkins. Closes #22230 from zsxwing/SPARK-25214-2. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? Improved the documentation for the datetime functions in `org.apache.spark.sql.functions` by adding details about the supported column input types, the column return type, behaviour on invalid input, supporting examples and clarifications. ## How was this patch tested? Manually testing each of the datetime functions with different input to ensure that the corresponding Javadoc/Scaladoc matches the behaviour of the function. Successfully ran the `unidoc` SBT process. Closes #20901 from abradbury/SPARK-23792. Authored-by: Adam Bradbury <[email protected]> Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request?
`__version__` in `setup.py` is currently being dynamically read by `exec`; so the linter complains. Better just switch it off for this line for now.
**Before:**
```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
./setup.py:37:11: F821 undefined name '__version__'
VERSION = __version__
^
1 F821 undefined name '__version__'
1
```
**After:**
```bash
$ python -m flake8 . --count --select=E9,F82 --show-source --statistics
0
```
## How was this patch tested?
Manually tested.
Closes #22235 from HyukjinKwon/SPARK-23698.
Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
… global limit ## What changes were proposed in this pull request? This is based on the discussion https://github.com/apache/spark/pull/16677/files#r212805327. As SQL standard doesn't mandate that a nested order by followed by a limit has to respect that ordering clause, this patch removes the `child.outputOrdering` check. ## How was this patch tested? Unit tests. Closes #22239 from viirya/improve-global-limit-parallelism-followup. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
… to configure the capacity of fast aggregation. ## What changes were proposed in this pull request? this pr add a configuration parameter to configure the capacity of fast aggregation. Performance comparison: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel Aggregate w multiple keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ fasthash = default 5612 / 5882 3.7 267.6 1.0X fasthash = config 3586 / 3595 5.8 171.0 1.6X ``` ## How was this patch tested? the existed test cases. Closes #21931 from heary-cao/FastHashCapacity. Authored-by: caoxuewen <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? This PR adds a unit test for OpenHashMap , this can help developers to distinguish between the 0/0.0/0L and null ## How was this patch tested? Closes #22241 from 10110346/openhashmap. Authored-by: liuxian <[email protected]> Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration. We caught most instances of this in the original PR, but this one slipped through. ## How was this patch tested? n/a Closes #22245 from jose-torres/fixflake. Authored-by: Jose Torres <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…filesystem explicitly specified by the user ## What changes were proposed in this pull request? Our HDFS cluster configured 5 nameservices: `nameservices1`, `nameservices2`, `nameservices3`, `nameservices-dev1` and `nameservices4`, but `nameservices-dev1` unstable. So sometimes an error occurred and causing the entire job failed since [SPARK-24149](https://issues.apache.org/jira/browse/SPARK-24149):  I think it's best to add a switch here. ## How was this patch tested? manual tests Closes #21734 from wangyum/SPARK-24149. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
## What changes were proposed in this pull request? Updated documentation for Spark on Kubernetes for the upcoming 2.4.0. Please review http://spark.apache.org/contributing.html before opening a pull request. mccheah erikerlandson Closes #22224 from liyinan926/master. Authored-by: Yinan Li <[email protected]> Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters. ## How was this patch tested? Add a new regression test Closes #22104 from icexelloss/SPARK-24721-udf-filter. Authored-by: Li Jin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…nd SocketAuthHelper ## What changes were proposed in this pull request? Make sure TransportServer and SocketAuthHelper close the resources for all types of errors. ## How was this patch tested? Jenkins Closes #22210 from zsxwing/SPARK-25218. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support: - The whole batch contains no data messages - The first offset in a batch is not a committed data message - The last offset in a batch is not a committed data message - There is a gap in the middle of a batch They are all covered by the new unit tests. ## How was this patch tested? The new unit tests. Closes #22042 from zsxwing/kafka-transaction-read. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? This PR implements the possibility of the user to override the maximum number of buckets when saving to a table. Currently the limit is a hard-coded 100k, which might be insufficient for large workloads. A new configuration entry is proposed: `spark.sql.bucketing.maxBuckets`, which defaults to the previous 100k. ## How was this patch tested? Added unit tests in the following spark.sql test suites: - CreateTableAsSelectSuite - BucketedWriteSuite Author: Fernando Pereira <[email protected]> Closes #21087 from ferdonline/enh/configurable_bucket_limit.
…ct failure in YARN mode ## What changes were proposed in this pull request? YARN `AmIpFilter` adds a new parameter "RM_HA_URLS" to support RM HA, but Spark on YARN doesn't provide a such parameter, so it will be failed to redirect when running on RM HA. The detailed exception can be checked from JIRA. So here fixing this issue by adding "RM_HA_URLS" parameter. ## How was this patch tested? Local verification. Closes #22164 from jerryshao/SPARK-23679. Authored-by: jerryshao <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
## What changes were proposed in this pull request? In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only. ## How was this patch tested? I extended an existing test to trigger the deadlock. Author: Maxim Gekk <[email protected]> Closes #22233 from MaxGekk/fix-recover-partitions.
## What changes were proposed in this pull request? This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. ## How was this patch tested? Tested memory limits in our YARN cluster and verified that MemoryError is thrown. Author: Ryan Blue <[email protected]> Closes #21977 from rdblue/SPARK-25004-add-python-memory-limit.
## What changes were proposed in this pull request? Support Filter in ConvertToLocalRelation, similar to how Project works. Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the plan. This is good for very short queries which often are queries on local relations. ## How was this patch tested? New test. Manual benchmark. Author: Bogdan Raducanu <[email protected]> Author: Shixiong Zhu <[email protected]> Author: Yinan Li <[email protected]> Author: Li Jin <[email protected]> Author: s71955 <[email protected]> Author: DB Tsai <[email protected]> Author: jaroslav chládek <[email protected]> Author: Huangweizhe <[email protected]> Author: Xiangrui Meng <[email protected]> Author: hyukjinkwon <[email protected]> Author: Kent Yao <[email protected]> Author: caoxuewen <[email protected]> Author: liuxian <[email protected]> Author: Adam Bradbury <[email protected]> Author: Jose Torres <[email protected]> Author: Yuming Wang <[email protected]> Author: Liang-Chi Hsieh <[email protected]> Closes #22205 from bogdanrdc/local-relation-filter.
…ameter ## What changes were proposed in this pull request? Fix the issue that minPartitions was not used in the method. This is a simple fix and I am not trying to make it complicated. The purpose is to still allow user to control the defaultParallelism through the value of minPartitions, while also via sc.defaultParallelism parameters. ## How was this patch tested? I have not provided the additional test since the fix is very straightforward. Closes #21638 from bomeng/22357. Lead-authored-by: Bo Meng <[email protected]> Co-authored-by: Bo Meng <[email protected]> Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? Most of `HigherOrderFunction`s have the same `nullable` definition, ie. they are nullable when one of their arguments is nullable. The PR refactors it in order to avoid code duplication. ## How was this patch tested? NA Closes #22243 from mgaido91/MINOR_nullable_hof. Authored-by: Marco Gaido <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
## What changes were proposed in this pull request?
`toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name.
When invoked with a spark data type like,
```java
val sparkSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("address", StructType(Seq(
StructField("city", StringType, nullable = false),
StructField("state", StringType, nullable = false))),
nullable = false)))
// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema, false, "employee", "foo.bar")
// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is
```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?
New and existing unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes #22251 from arunmahadevan/avro-fix.
Authored-by: Arun Mahadevan <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes #22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
## What changes were proposed in this pull request? Using some reflection tricks to merge Scala 2.11 and 2.12 codebase. ## How was this patch tested? Existing tests. Closes #22246 from dbtsai/repl. Lead-authored-by: DB Tsai <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: DB Tsai <[email protected]>
…and collecting Pandas DataFrames
## What changes were proposed in this pull request?
This changes the calls of `toPandas()` and `createDataFrame()` to use the Arrow stream format, when Arrow is enabled. Previously, Arrow data was written to byte arrays where each chunk is an output of the Arrow file format. This was mainly due to constraints at the time, and caused some overhead by writing the schema/footer on each chunk of data and then having to read multiple Arrow file inputs and concat them together.
Using the Arrow stream format has improved these by increasing performance, lower memory overhead for the average case, and simplified the code. Here are the details of this change:
**toPandas()**
_Before:_
Spark internal rows are converted to Arrow file format, each group of records is a complete Arrow file which contains the schema and other metadata. Next a collect is done and an Array of Arrow files is the result. After that each Arrow file is sent to Python driver which then loads each file and concats them to a single Arrow DataFrame.
_After:_
Spark internal rows are converted to ArrowRecordBatches directly, which is the simplest Arrow component for IPC data transfers. The driver JVM then immediately starts serving data to Python as an Arrow stream, sending the schema first. It then starts a Spark job with a custom handler that sends Arrow RecordBatches to Python. Partitions arriving in order are sent immediately, and out-of-order partitions are buffered until the ones that precede it come in. This improves performance, simplifies memory usage on executors, and improves the average memory usage on the JVM driver. Since the order of partitions must be preserved, the worst case is that the first partition will be the last to arrive all data must be buffered in memory until then. This case is no worse that before when doing a full collect.
**createDataFrame()**
_Before:_
A Pandas DataFrame is split into parts and each part is made into an Arrow file. Then each file is prefixed by the buffer size and written to a temp file. The temp file is read and each Arrow file is parallelized as a byte array.
_After:_
A Pandas DataFrame is split into parts, then an Arrow stream is written to a temp file where each part is an ArrowRecordBatch. The temp file is read as a stream and the Arrow messages are examined. If the message is an ArrowRecordBatch, the data is saved as a byte array. After reading the file, each ArrowRecordBatch is parallelized as a byte array. This has slightly more processing than before because we must look each Arrow message to extract the record batches, but performance ends up a litle better. It is cleaner in the sense that IPC from Python to JVM is done over a single Arrow stream.
## How was this patch tested?
Added new unit tests for the additions to ArrowConverters in Scala, existing tests for Python.
## Performance Tests - toPandas
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.
Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
start = time.time()
_ = df.toPandas()
elapsed = time.time() - start
```
Current Master | This PR
---------------------|------------
5.803557 | 5.16207
5.409119 | 5.133671
5.493509 | 5.147513
5.433107 | 5.105243
5.488757 | 5.018685
Avg Master | Avg This PR
------------------|--------------
5.5256098 | 5.1134364
Speedup of **1.08060595**
## Performance Tests - createDataFrame
Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `createDataFrame()` and get the first record. Took the average best time of 5 runs/5 loops each.
Test code
```python
def run():
pdf = pd.DataFrame(np.random.rand(10000000, 10))
spark.createDataFrame(pdf).first()
for i in range(6):
start = time.time()
run()
elapsed = time.time() - start
gc.collect()
print("Run %d: %f" % (i, elapsed))
```
Current Master | This PR
--------------------|----------
6.234608 | 5.665641
6.32144 | 5.3475
6.527859 | 5.370803
6.95089 | 5.479151
6.235046 | 5.529167
Avg Master | Avg This PR
---------------|----------------
6.4539686 | 5.4784524
Speedup of **1.178064192**
## Memory Improvements
**toPandas()**
The most significant improvement is reduction of the upper bound space complexity in the JVM driver. Before, the entire dataset was collected in the JVM first before sending it to Python. With this change, as soon as a partition is collected, the result handler immediately sends it to Python, so the upper bound is the size of the largest partition. Also, using the Arrow stream format is more efficient because the schema is written once per stream, followed by record batches. The schema is now only send from driver JVM to Python. Before, multiple Arrow file formats were used that each contained the schema. This duplicated schema was created in the executors, sent to the driver JVM, and then Python where all but the first one received are discarded.
I verified the upper bound limit by running a test that would collect data that would exceed the amount of driver JVM memory available. Using these settings on a standalone cluster:
```
spark.driver.memory 1g
spark.executor.memory 5g
spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled false
spark.sql.execution.arrow.maxRecordsPerBatch 0
spark.driver.maxResultSize 2g
```
Test code:
```python
from pyspark.sql.functions import rand
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand())
df.toPandas()
```
This makes total data size of 33554432×8×4 = 1073741824
With the current master, it fails with OOM but passes using this PR.
**createDataFrame()**
No significant change in memory except that using the stream format instead of separate file formats avoids duplicated the schema, similar to toPandas above. The process of reading the stream and parallelizing the batches does cause the record batch message metadata to be copied, but it's size is insignificant.
Closes #21546 from BryanCutler/arrow-toPandas-stream-SPARK-23030.
Authored-by: Bryan Cutler <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
…primitive args to Object in Scala 2.12 ## What changes were proposed in this pull request? Alternative take on #22063 that does not introduce udfInternal. Resolve issue with inferring func types in 2.12 by instead using info captured when UDF is registered -- capturing which types are nullable (i.e. not primitive) ## How was this patch tested? Existing tests. Closes #22259 from srowen/SPARK-25044.2. Authored-by: Sean Owen <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Merge lastest version