Update upstream#65
Merged
GulajavaMinistudio merged 8 commits intoGulajavaMinistudio:masterfrom Jun 1, 2017
Merged
Conversation
…park ## What changes were proposed in this pull request? Hadoop FileSystem's statistics in based on thread local variables, this is ok if the RDD computation chain is running in the same thread. But if child RDD creates another thread to consume the iterator got from Hadoop RDDs, the bytesRead computation will be error, because now the iterator's `next()` and `close()` may run in different threads. This could be happened when using PySpark with PythonRDD. So here building a map to track the `bytesRead` for different thread and add them together. This method will be used in three RDDs, `HadoopRDD`, `NewHadoopRDD` and `FileScanRDD`. I assume `FileScanRDD` cannot be called directly, so I only fixed `HadoopRDD` and `NewHadoopRDD`. ## How was this patch tested? Unit test and local cluster verification. Author: jerryshao <sshao@hortonworks.com> Closes #17617 from jerryshao/SPARK-20244.
## What changes were proposed in this pull request? Since [SPARK-9263](https://issues.apache.org/jira/browse/SPARK-9263), `resolveMavenCoordinates` ignores Spark and Spark's dependencies by using `addExclusionRules`. This PR aims to make [addExclusionRules](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L956-L974) up-to-date to neglect correctly because it fails to neglect some components like the following. **mllib (correct)** ``` $ bin/spark-shell --packages org.apache.spark:spark-mllib_2.11:2.1.1 ... --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 0 | 0 | 0 | 0 || 0 | 0 | --------------------------------------------------------------------- ``` **mllib-local (wrong)** ``` $ bin/spark-shell --packages org.apache.spark:spark-mllib-local_2.11:2.1.1 ... --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 15 | 2 | 2 | 0 || 15 | 2 | --------------------------------------------------------------------- ``` ## How was this patch tested? Pass the Jenkins with a updated test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17947 from dongjoon-hyun/SPARK-20708.
## What changes were proposed in this pull request? Fix a few function description error. ## How was this patch tested? manual tests  Author: Yuming Wang <wgyumg@gmail.com> Closes #18157 from wangyum/DescIssues.
## What changes were proposed in this pull request? Add build-int SQL function - UUID. ## How was this patch tested? unit tests Author: Yuming Wang <wgyumg@gmail.com> Closes #18136 from wangyum/SPARK-20910.
## What changes were proposed in this pull request? - ~~I added the method `toBlockMatrixDense` to the IndexedRowMatrix class. The current implementation of `toBlockMatrix` is insufficient for users with relatively dense IndexedRowMatrix objects, since it assumes sparsity.~~ EDIT: Ended up deciding that there should be just a single `toBlockMatrix` method, which creates a BlockMatrix whose blocks may be dense or sparse depending on the sparsity of the rows. This method will work better on any current use case of `toBlockMatrix` and doesn't go through `CoordinateMatrix` like the old method. ## How was this patch tested? ~~I used the same tests already written for `toBlockMatrix()` to test this method. I also added a new additional unit test for an edge case that was not adequately tested by current test suite.~~ I ran the original `IndexedRowMatrix` tests, plus wrote more to better handle edge cases ignored by original tests. Author: John Compitello <johnc@broadinstitute.org> Closes #17459 from johnc1231/johnc-fix-ir-to-block.
### What changes were proposed in this pull request? Before this PR, Subquery reuse does not work. Below are three issues: - Subquery reuse does not work. - It is sharing the same `SQLConf` (`spark.sql.exchange.reuse`) with the one for Exchange Reuse. - No test case covers the rule Subquery reuse. This PR is to fix the above three issues. - Ignored the physical operator `SubqueryExec` when comparing two plans. - Added a dedicated conf `spark.sql.subqueries.reuse` for controlling Subquery Reuse - Added a test case for verifying the behavior ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #18169 from gatorsmile/subqueryReuse.
In Spark on YARN, when configuring "spark.yarn.jars" with local jars (jars started with "local" scheme), we will get inaccurate classpath for AM and containers. This is because we don't remove "local" scheme when concatenating classpath. It is OK to run because classpath is separated with ":" and java treat "local" as a separate jar. But we could improve it to remove the scheme. Updated `ClientSuite` to check "local" is not in the classpath. cc jerryshao Author: Li Yichao <lyc@zhihu.com> Author: Li Yichao <liyichao.good@gmail.com> Closes #18129 from liyichao/SPARK-20365.
… by the launcher. Blindly deserializing classes using Java serialization opens the code up to issues in other libraries, since just deserializing data from a stream may end up execution code (think readObject()). Since the launcher protocol is pretty self-contained, there's just a handful of classes it legitimately needs to deserialize, and they're in just two packages, so add a filter that throws errors if classes from any other package show up in the stream. This also maintains backwards compatibility (the updated launcher code can still communicate with the backend code in older Spark releases). Tested with new and existing unit tests. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #18166 from vanzin/SPARK-20922.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Jan 15, 2021
…join can be planned as broadcast join
### What changes were proposed in this pull request?
Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.
```scala
spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1")
spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain
```
Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
:- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65]
: +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```
After this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
:- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```
### Why are the changes needed?
1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance.
2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test and benchmark test.
SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
q14a | 660 | 594
q14b | 660 | 600
q38 | 55 | 29
q87 | 66 | 35
Before this pr:

After this pr:

Closes apache#31145 from wangyum/SPARK-34081.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.