forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 3
Choosing splits returns None for split if no valid one found #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…could be found (all instances are split to one leaf and 0 to the other). Updated PartitionInfo.update accordingly.
Owner
Author
|
I'll go ahead and merge this, but let me know if it messes up your WIP code changes. |
jkbradley
added a commit
that referenced
this pull request
Oct 17, 2015
Choosing splits returns None for split if no valid one found
jkbradley
pushed a commit
that referenced
this pull request
Mar 4, 2016
## What changes were proposed in this pull request?
This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen.
For example:
```python
>>> sqlContext.range(100).registerTempTable("range")
>>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias(('id / subquery#9), None)]
: +- 'SubqueryAlias subquery#9
: +- 'Project [unresolvedalias('sum('id), None)]
: +- 'UnresolvedRelation `range`, None
+- 'Filter ('id > subquery#8)
: +- 'SubqueryAlias subquery#8
: +- 'GlobalLimit 1
: +- 'LocalLimit 1
: +- 'Project [unresolvedalias('id, None)]
: +- 'UnresolvedRelation `range`, None
+- 'UnresolvedRelation `range`, None
== Analyzed Logical Plan ==
(id / scalarsubquery()): double
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
: +- SubqueryAlias subquery#9
: +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
: +- SubqueryAlias range
: +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
: +- SubqueryAlias subquery#8
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [id#0L]
: +- SubqueryAlias range
: +- Range 0, 100, 1, 4, [id#0L]
+- SubqueryAlias range
+- Range 0, 100, 1, 4, [id#0L]
== Optimized Logical Plan ==
Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
: +- SubqueryAlias subquery#9
: +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L]
: +- Range 0, 100, 1, 4, [id#0L]
+- Filter (id#0L > subquery#8)
: +- SubqueryAlias subquery#8
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Project [id#0L]
: +- Range 0, 100, 1, 4, [id#0L]
+- Range 0, 100, 1, 4, [id#0L]
== Physical Plan ==
WholeStageCodegen
: +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11]
: : +- Subquery subquery#9
: : +- WholeStageCodegen
: : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L])
: : : +- INPUT
: : +- Exchange SinglePartition, None
: : +- WholeStageCodegen
: : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L])
: : : +- Range 0, 1, 4, 100, [id#0L]
: +- Filter (id#0L > subquery#8)
: : +- Subquery subquery#8
: : +- CollectLimit 1
: : +- WholeStageCodegen
: : : +- Project [id#0L]
: : : +- Range 0, 1, 4, 100, [id#0L]
: +- Range 0, 1, 4, 100, [id#0L]
```
The web UI looks like:

This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by apache#11403 .
## How was this patch tested?
Existing tests, also manual tests with the example query, check the explain and web UI.
Author: Davies Liu <[email protected]>
Closes apache#11417 from davies/viz_subquery.
jkbradley
pushed a commit
that referenced
this pull request
Apr 28, 2016
CheckpointWriter.stop() is prone to a race condition: if one thread calls `stop()` right as a checkpoint write task begins to execute, that write task may become blocked when trying to access `fs`, the shared Hadoop FileSystem, since both the `fs` getter and `stop` method synchronize on the same lock. Here's a thread-dump excerpt which illustrates the problem: ```java "pool-31-thread-1" apache#156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b waiting for monitor entry [0x000000013bc4c000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302) - waiting to lock <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter) at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) "pool-1-thread-1-ScalaTest-running-MapWithStateSuite" #11 prio=5 os_prio=31 tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007bf564568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465) at org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291) - locked <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter) at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159) - locked <0x00000007bf53ea90> (a org.apache.spark.streaming.scheduler.JobGenerator) at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115) - locked <0x00000007bf53d3f0> (a org.apache.spark.streaming.scheduler.JobScheduler) at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679) - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644) - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext) [...] ``` We can fix this problem by having `stop` and `fs` be synchronized on different locks: the synchronization on `stop` only needs to guard against multiple threads calling `stop` at the same time, whereas the synchronization on `fs` is only necessary for cross-thread visibility. There's only ever a single active checkpoint writer thread at a time, so we don't need to guard against concurrent access to `fs`. Thus, `fs` can simply become a `volatile` var, similar to `lastCheckpointTime`. This change should fix [SPARK-13693](https://issues.apache.org/jira/browse/SPARK-13693), a flaky `MapWithStateSuite` test suite which has recently been failing several times per day. It also results in a huge test speedup: prior to this patch, `MapWithStateSuite` took about 80 seconds to run, whereas it now runs in less than 10 seconds. For the `streaming` project's tests as a whole, they now run in ~220 seconds vs. ~354 before. /cc zsxwing and tdas for review. Author: Josh Rosen <[email protected]> Closes apache#12712 from JoshRosen/fix-checkpoint-writer-race.
jkbradley
pushed a commit
that referenced
this pull request
Aug 28, 2017
…pressions
## What changes were proposed in this pull request?
This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:
```
val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
val df = spark.createDataFrame(rdd, inputSchema)
// Works correctly since no nested decimal expression is involved
// Expected result type: (26, 6) * (26, 6) = (38, 12)
df.select($"col" * $"col").explain(true)
df.select($"col" * $"col").printSchema()
// Gives a wrong result since there is a nested decimal expression that should be visited first
// Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
df.select($"col" * $"col" * $"col").explain(true)
df.select($"col" * $"col" * $"col").printSchema()
```
The example above gives the following output:
```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- (col * col): decimal(38,12) (nullable = true)
// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- ((col * col) * col): decimal(38,12) (nullable = true)
```
## How was this patch tested?
This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.
Author: aokolnychyi <[email protected]>
Closes apache#18583 from aokolnychyi/spark-21332.
jkbradley
pushed a commit
that referenced
this pull request
Feb 21, 2019
…/`to_avro`
## What changes were proposed in this pull request?
Previously in from_avro/to_avro, we override the method `simpleString` and `sql` for the string output. However, the override only affects the alias naming:
```
Project [from_avro('col,
...
, (mode,PERMISSIVE)) AS from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))#11]
```
It only makes the alias name quite long: `from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))`).
We should follow `from_csv`/`from_json` here, to override the method prettyName only, and we will get a clean alias name
```
... AS from_avro(col)#11
```
## How was this patch tested?
Manual check
Closes apache#22890 from gengliangwang/revise_from_to_avro.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
jkbradley
pushed a commit
that referenced
this pull request
Jan 2, 2020
### Why are the changes needed? `EnsureRequirements` adds `ShuffleExchangeExec` (RangePartitioning) after Sort if `RoundRobinPartitioning` behinds it. This will cause 2 shuffles, and the number of partitions in the final stage is not the number specified by `RoundRobinPartitioning. **Example SQL** ``` SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a ``` **BEFORE** ``` == Physical Plan == *(1) Sort [a#0 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 200), true, [id=#11] +- Exchange RoundRobinPartitioning(5), false, [id=#9] +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1] ``` **AFTER** ``` == Physical Plan == *(1) Sort [a#0 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 5), true, [id=#11] +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1] ``` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run suite Tests and add new test for this. Closes apache#26946 from stczwd/RoundRobinPartitioning. Lead-authored-by: lijunqing <[email protected]> Co-authored-by: stczwd <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Changed choosing splits to return None for a split if no valid split could be found (all instances are split to one leaf and 0 to the other). Updated PartitionInfo.update accordingly.
CC: @feynmanliang @fabuzaid21 Will this conflict with stuff you're doing, or can I merge it?
Btw, @fabuzaid21 after you send your current changes, you might want to change your fork of spark to be a fork of apache/spark rather than my branch. You can add mine as another remote.