Update upstream#49
Merged
GulajavaMinistudio merged 4 commits intoGulajavaMinistudio:masterfrom May 17, 2017
Merged
Conversation
## What changes were proposed in this pull request?
We add missing attributes into Filter in Analyzer. But we shouldn't do it through subqueries like this:
select 1 from (select 1 from onerow t1 LIMIT 1) where t1.c1=1
This query works in current codebase. However, the outside where clause shouldn't be able to refer `t1.c1` attribute.
The root cause is we allow subqueries in FROM have no alias names previously, it is confusing and isn't supported by various databases such as MySQL, Postgres, Oracle. We shouldn't support it too.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #17935 from viirya/SPARK-20690.
…kMetrics construction ## What changes were proposed in this pull request? In ``` ./bin/spark-shell --master=local[64] ``` I ran ``` sc.parallelize(1 to 100000, 100000).count() ``` and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in `TaskMetrics.empty` calls in `JobProgressListener.onTaskStart`. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code. The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput. **Before:**  **After:**  ## How was this patch tested? Benchmarks described above. Author: Josh Rosen <joshrosen@databricks.com> Closes #18008 from JoshRosen/nametoaccums-improvements.
## What changes were proposed in this pull request? SPARK-13973 incorrectly removed the required PYSPARK_DRIVER_PYTHON_OPTS=notebook from documentation to use pyspark with Jupyter notebook. This patch corrects the documentation error. ## How was this patch tested? Tested invocation locally with ```bash PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark ``` Author: Andrew Ray <ray.andrew@gmail.com> Closes #18001 from aray/patch-1.
…g logs ## What changes were proposed in this pull request? Executor task reaper may fail to detect if a task is finished or not when a task is finishing but being killed at the same time. The fix is pretty easy, just flip the "finished" flag when a task is successful. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #18021 from zsxwing/SPARK-20788.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Nov 15, 2019
…tions
### What changes were proposed in this pull request?
In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`.
How to reproduce:
```scala
val bucketedTableName = "bucketed_table"
spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName)
val bucketedTable = spark.table(bucketedTableName)
val df = spark.range(8)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Spark 2.4. spark.sql.adaptive.enabled=false
// We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case.
spark.conf.set("spark.sql.shuffle.partitions", 500)
bucketedTable.join(df, "id").explain()
// Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000)
bucketedTable.join(df, "id").explain()
```
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
*(4) Project [id#5L]
+- *(4) SortMergeJoin [id#5L], [id#7L], Inner
:- *(1) Sort [id#5L ASC NULLS FIRST], false, 0
: +- *(1) Project [id#5L]
: +- *(1) Filter isnotnull(id#5L)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
+- *(3) Sort [id#7L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#7L, 500), true, [id=#49]
+- *(2) Range (0, 8, step=1, splits=16)
```
vs
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [id#5L]
+- SortMergeJoin [id#5L], [id#7L], Inner
:- Sort [id#5L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5L, 1000), true, [id=#93]
: +- Project [id#5L]
: +- Filter isnotnull(id#5L)
: +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
+- Sort [id#7L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#7L, 1000), true, [id=#92]
+- Range (0, 8, step=1, splits=16)
```
This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`.
### Why are the changes needed?
Do not degrade performance after enabling adaptive execution.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Unit test.
Closes apache#26409 from wangyum/SPARK-29655.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Oct 8, 2020
… more scenarios such as PartitioningCollection
### What changes were proposed in this pull request?
This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios:
1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`.
2. Handle `PartitioningCollection`, which may contain `HashPartitioning`
### Why are the changes needed?
1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side.
The following will not consider the right-side `HashPartitioning`:
```
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
join.explain
== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
: +- *(1) Project [i1#26, j1#27]
: +- *(1) Filter isnotnull(i1#26)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
+- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed
+- *(3) Project [i2#30, j2#31]
+- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. For the scenario 2), the current behavior does not handle `PartitioningCollection`:
```
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain
== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed
: +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed
: +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
: :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
: : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
: : +- *(1) LocalTableScan [_1#2, _2#3]
: +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
: +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
: +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
+- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
+- *(7) LocalTableScan [_1#24, _2#25]
```
### Does this PR introduce _any_ user-facing change?
Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed:
1. Senario 1):
```
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
: +- *(1) Project [i1#26, j1#27]
: +- *(1) Filter isnotnull(i1#26)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
+- *(3) Project [i2#30, j2#31]
+- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. Scenario 2):
```
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
: :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
: : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
: : +- *(1) LocalTableScan [_1#2, _2#3]
: +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
: +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
: +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
+- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
+- *(6) LocalTableScan [_1#24, _2#25]
```
### How was this patch tested?
Added tests.
Closes apache#29074 from imback82/reorder_keys.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Jan 18, 2024
### What changes were proposed in this pull request? Purge pip cache in dockerfile ### Why are the changes needed? to save 4~5G disk space: before https://github.com/zhengruifeng/spark/actions/runs/7541725028/job/20530432798 ``` #45 [39/39] RUN df -h #45 0.090 Filesystem Size Used Avail Use% Mounted on #45 0.090 overlay 84G 70G 15G 83% / #45 0.090 tmpfs 64M 0 64M 0% /dev #45 0.090 shm 64M 0 64M 0% /dev/shm #45 0.090 /dev/root 84G 70G 15G 83% /etc/resolv.conf #45 0.090 tmpfs 7.9G 0 7.9G 0% /proc/acpi #45 0.090 tmpfs 7.9G 0 7.9G 0% /sys/firmware #45 0.090 tmpfs 7.9G 0 7.9G 0% /proc/scsi #45 DONE 2.0s ``` after https://github.com/zhengruifeng/spark/actions/runs/7549204209/job/20552796796 ``` #48 [42/43] RUN python3.12 -m pip cache purge #48 0.670 Files removed: 392 #48 DONE 0.7s #49 [43/43] RUN df -h #49 0.075 Filesystem Size Used Avail Use% Mounted on #49 0.075 overlay 84G 65G 19G 79% / #49 0.075 tmpfs 64M 0 64M 0% /dev #49 0.075 shm 64M 0 64M 0% /dev/shm #49 0.075 /dev/root 84G 65G 19G 79% /etc/resolv.conf #49 0.075 tmpfs 7.9G 0 7.9G 0% /proc/acpi #49 0.075 tmpfs 7.9G 0 7.9G 0% /sys/firmware #49 0.075 tmpfs 7.9G 0 7.9G 0% /proc/scsi ``` ### Does this PR introduce _any_ user-facing change? no, infra-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#44768 from zhengruifeng/infra_docker_cleanup. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.