Skip to content

Update upstream#109

Merged
GulajavaMinistudio merged 10 commits intoGulajavaMinistudio:masterfrom
apache:master
Jul 20, 2017
Merged

Update upstream#109
GulajavaMinistudio merged 10 commits intoGulajavaMinistudio:masterfrom
apache:master

Conversation

@GulajavaMinistudio
Copy link
Copy Markdown
Owner

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

gatorsmile and others added 10 commits July 19, 2017 10:57
…tor pattern and mixin

## What changes were proposed in this pull request?
This PR is to add back the stats propagation of `Window` and remove the stats calculation of the leaf node `Range`, which has been covered by https://github.com/rxin/spark/blob/9c32d2507d3f4f269e17e841a4a4e4920b35a5e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala#L56

## How was this patch tested?
Added two test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18677 from gatorsmile/visitStats.
## What changes were proposed in this pull request?
The most of BoundedPriorityQueue usages in ML/MLLIB are:
Get the value of BoundedPriorityQueue, then sort it.
For example, in Word2Vec: pq.toSeq.sortBy(-_._2)
in ALS, pq.toArray.sorted()

The test results show using pq.poll is much faster than sort the value.
It is good to add the poll function for BoundedPriorityQueue.

## How was this patch tested?
The existing UT

Author: Peng <peng.meng@intel.com>
Author: Peng Meng <peng.meng@intel.com>

Closes #18620 from mpjlu/add-poll.
## What changes were proposed in this pull request?

In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 following)
from table
```
We can refine the logic and just add the qualified rows into buffer.

## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10`
against a table with 4  columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.
```
MemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
	at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

Author: jinxing <jinxing6042@126.com>

Closes #18634 from jinxing64/SPARK-21414.
…lures in some cases

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

## How was this patch tested?
Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18656 from DonnyZone/Fix_SortMergeJoinExec.
…ime class

## What changes were proposed in this pull request?

Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime causes deprecation warnings during compilation. This cannot be avoided entirely as even though it is deprecated as a public API, ProcessingTime instances are used internally in TriggerExecutor. This PR is to minimize the warning by removing its uses from tests as much as possible.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18678 from tdas/SPARK-21464.
## What changes were proposed in this pull request?
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled.

## How was this patch tested?
Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.)

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #18487 from dhruve/impr/SPARK-21243.
## What changes were proposed in this pull request?
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-21446
options.asConnectionProperties can not have fetchsize,because fetchsize belongs to Spark-only options, and Spark-only options have been excluded in connection properities.
So change properties of beforeFetch from  options.asConnectionProperties.asScala.toMap to options.asProperties.asScala.toMap

## How was this patch tested?

Author: DFFuture <albert.zhang23@gmail.com>

Closes #18665 from DFFuture/sparksql_pg.
## What changes were proposed in this pull request?

Current behavior: in Mesos cluster mode, the driver failover_timeout is set to zero. If the driver temporarily loses connectivity with the Mesos master, the framework will be torn down and all executors killed.

Proposed change: make the failover_timeout configurable via a new option, spark.mesos.driver.failoverTimeout. The default value is still zero.

Note: with non-zero failover_timeout, an explicit teardown is needed in some cases. This is captured in https://issues.apache.org/jira/browse/SPARK-21458

## How was this patch tested?

Added a unit test to make sure the config option is set while creating the scheduler driver.

Ran an integration test with mesosphere/spark showing that with a non-zero failover_timeout the Spark job finishes after a driver is disconnected from the master.

Author: Susan X. Huynh <xhuynh@mesosphere.com>

Closes #18674 from susanxhuynh/sh-mesos-failover-timeout.
…#joinWith

## What changes were proposed in this pull request?

Two invalid join types were mistakenly listed in the javadoc for joinWith, in the Dataset class. I presume these were copied from the javadoc of join, but since joinWith returns a Dataset\<Tuple2\>, left_semi and left_anti are invalid, as they only return values from one of the datasets, instead of from both

## How was this patch tested?

I ran the following code :
```
public static void main(String[] args) {
	SparkSession spark = new SparkSession(new SparkContext("local[*]", "Test"));
	Dataset<Row> one = spark.createDataFrame(Arrays.asList(new Bean(1), new Bean(2), new Bean(3), new Bean(4), new Bean(5)), Bean.class);
	Dataset<Row> two = spark.createDataFrame(Arrays.asList(new Bean(4), new Bean(5), new Bean(6), new Bean(7), new Bean(8), new Bean(9)), Bean.class);

	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "inner").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "cross").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "full_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "right_outer").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_semi").show();} catch (Exception e) {e.printStackTrace();}
	try {two.joinWith(one, one.col("x").equalTo(two.col("x")), "left_anti").show();} catch (Exception e) {e.printStackTrace();}
}
```
which tests all the different join types, and the last two (left_semi and left_anti) threw exceptions. The same code using join instead of joinWith did fine. The Bean class was just a java bean with a single int field, x.

Author: Corey Woodfield <coreywoodfield@gmail.com>

Closes #18462 from coreywoodfield/master.
…ce performed by MetadataLogFileIndex

## What changes were proposed in this pull request?

When using the MetadataLogFileIndex to read back a table, we don't respect the user provided schema as the proper column types. This can lead to issues when trying to read strings that look like dates that get truncated to DateType, or longs being truncated to IntegerType, just because a long value doesn't exist.

## How was this patch tested?

Unit tests and manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #18676 from brkyvz/stream-partitioning.
@GulajavaMinistudio GulajavaMinistudio merged commit 00bd984 into GulajavaMinistudio:master Jul 20, 2017
GulajavaMinistudio pushed a commit that referenced this pull request Apr 25, 2018
## What changes were proposed in this pull request?
Union of map and other compatible column result in unresolved operator 'Union; exception

Reproduction
`spark-sql>select map(1,2), 'str' union all select map(1,2,3,null), 1`
Output:
```
Error in query: unresolved operator 'Union;;
'Union
:- Project [map(1, 2) AS map(1, 2)#106, str AS str#107]
:  +- OneRowRelation$
+- Project [map(1, cast(2 as int), 3, cast(null as int)) AS map(1, CAST(2 AS INT), 3, CAST(NULL AS INT))#109, 1 AS 1#108]
   +- OneRowRelation$
```
So, we should cast part of columns to be compatible when appropriate.

## How was this patch tested?
Added a test (query union of map and other columns) to SQLQueryTestSuite's union.sql.

Author: liutang123 <liutang123@yeah.net>

Closes apache#21100 from liutang123/SPARK-24012.
GulajavaMinistudio pushed a commit that referenced this pull request Jul 20, 2020
### What changes were proposed in this pull request?

Currently `ShuffledHashJoin.outputPartitioning` inherits from `HashJoin.outputPartitioning`, which only preserves stream side partitioning (`HashJoin.scala`):

```
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
```

This loses build side partitioning information, and causes extra shuffle if there's another join / group-by after this join.

Example:

```
withSQLConf(
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
    SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
  val df1 = spark.range(10).select($"id".as("k1"))
  val df2 = spark.range(30).select($"id".as("k2"))
  Seq("inner", "cross").foreach(joinType => {
    val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
      .queryExecution.executedPlan
    assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
    // No extra shuffle before aggregate
    assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
  })
}
```

Current physical plan (having an extra shuffle on `k1` before aggregate)

```
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
   +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L])
      +- *(3) Project [k1#220L]
         +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
            :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
            :  +- *(1) Project [id#218L AS k1#220L]
            :     +- *(1) Range (0, 10, step=1, splits=2)
            +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
               +- *(2) Project [id#222L AS k2#224L]
                  +- *(2) Range (0, 30, step=1, splits=2)
```

Ideal physical plan (no shuffle on `k1` before aggregate)

```
*(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L])
   +- *(3) Project [k1#220L]
      +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
         :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
         :  +- *(1) Project [id#218L AS k1#220L]
         :     +- *(1) Range (0, 10, step=1, splits=2)
         +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
            +- *(2) Project [id#222L AS k2#224L]
               +- *(2) Range (0, 30, step=1, splits=2)
```

This can be fixed by overriding `outputPartitioning` method in `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.
In addition, also fix one typo in `HashJoin`, as that code path is shared between broadcast hash join and shuffled hash join.

### Why are the changes needed?

To avoid shuffle (for queries having multiple joins or group-by), for saving CPU and IO.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `JoinSuite`.

Closes apache#29130 from c21/shj.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio pushed a commit that referenced this pull request Oct 20, 2023
…ProjectExec

### What changes were proposed in this pull request?

This PR fixes a bug when there are subqueries in `TakeOrderedAndProjectExec`. The executeCollect method does not wait for subqueries to finish and it can result in IllegalArgumentException when executing a simple query.
For example this query:
```
WITH t2 AS (
  SELECT * FROM t1 ORDER BY id
)
SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
```
will fail with this error
```
 java.lang.IllegalArgumentException: requirement failed: Subquery subquery#242, [id=#109] has not finished
```

### Why are the changes needed?

To fix a bug.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#43419 from allisonwang-db/spark-45584-subquery-failure.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants