Update upstream#51
Merged
GulajavaMinistudio merged 9 commits intoGulajavaMinistudio:masterfrom May 19, 2017
Merged
Conversation
…l before calling the getter
## What changes were proposed in this pull request?
GenerateUnsafeProjection.writeStructToBuffer() did not honor the assumption that the caller must make sure that a value is not null before using the getter. This could lead to various errors. This change fixes that behavior.
Example of code generated before:
```scala
/* 059 */ final UTF8String fieldName = value.getUTF8String(0);
/* 060 */ if (value.isNullAt(0)) {
/* 061 */ rowWriter1.setNullAt(0);
/* 062 */ } else {
/* 063 */ rowWriter1.write(0, fieldName);
/* 064 */ }
```
Example of code generated now:
```scala
/* 060 */ boolean isNull1 = value.isNullAt(0);
/* 061 */ UTF8String value1 = isNull1 ? null : value.getUTF8String(0);
/* 062 */ if (isNull1) {
/* 063 */ rowWriter1.setNullAt(0);
/* 064 */ } else {
/* 065 */ rowWriter1.write(0, value1);
/* 066 */ }
```
## How was this patch tested?
Adds GenerateUnsafeProjectionSuite.
Author: Ala Luszczak <ala@databricks.com>
Closes #18030 from ala/fix-generate-unsafe-projection.
…mber of fields Fix quadratic List indexing in ParquetWriteSupport. I noticed this function while profiling some code with today. It showed up as a significant factor in a table with twenty columns; with hundreds of columns, it could dominate any other function call. ## What changes were proposed in this pull request? The writeFields method iterates from 0 until number of fields, indexing into rootFieldWriters for each element. rootFieldWriters is a List, so indexing is a linear operation. The complexity of the writeFields method is thus quadratic in the number of fields. Solution: explicitly convert rootFieldWriters to Array (implicitly converted to WrappedArray) for constant-time indexing. ## How was this patch tested? This is a one-line change for performance reasons. Author: tpoterba <tpoterba@broadinstitute.org> Author: Tim Poterba <tpoterba@gmail.com> Closes #18005 from tpoterba/tpoterba-patch-1.
## What changes were proposed in this pull request? This PR update to two: 1.adds the new unit tests. testing would be performed when there is no shuffle stage, shuffle will not generate the data file and the index files. 2.Modify the '[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file' unit test, parallelize is 1 but not is 2, Check the index file and delete. ## How was this patch tested? The new unit test. Author: caoxuewen <cao.xuewen@zte.com.cn> Closes #17868 from heary-cao/ShuffleSuite.
…pom.xml [https://issues.apache.org/jira/browse/SPARK-20759](https://issues.apache.org/jira/browse/SPARK-20759) SCALA_VERSION in _config.yml is 2.11.7, but 2.11.8 in pom.xml. So I think SCALA_VERSION in _config.yml should be consistent with pom.xml. Author: liuzhaokun <liu.zhaokun@zte.com.cn> Closes #17992 from liu-zhaokun/new.
## What changes were proposed in this pull request? Add built-in SQL Function - COT. ## How was this patch tested? unit tests Author: Yuming Wang <wgyumg@gmail.com> Closes #17999 from wangyum/SPARK-20751.
… which is not we expected.
## What changes were proposed in this pull request?
spark-sql>select month("1582-09-28");
spark-sql>10
For this case, the expected result is 9, but it is 10.
spark-sql>select day("1582-04-18");
spark-sql>28
For this case, the expected result is 18, but it is 28.
when the date before "1582-10-04", the function of `month` and `day` return the value which is not we expected.
## How was this patch tested?
unit tests
Author: liuxian <liu.xian3@zte.com.cn>
Closes #17997 from 10110346/wip_lx_0516.
## What changes were proposed in this pull request? Rename `carsDF` to `df` in SparkR `rollup` and `cube` examples. ## How was this patch tested? Manual tests. Author: zero323 <zero323@users.noreply.github.com> Closes #17988 from zero323/cube-docs.
## What changes were proposed in this pull request? Some examples in the DataFrame methods are syntactically wrong, even though they are pseudo code. Fix these and some style issues. Author: Wayne Zhang <actuaryzhang@uber.com> Closes #18003 from actuaryzhang/sparkRDoc3.
Update ML guide for migration `2.1` -> `2.2` and the previous version migration guide section. ## How was this patch tested? Build doc locally. Author: Nick Pentreath <nickp@za.ibm.com> Closes #17996 from MLnick/SPARK-20506-2.2-migration-guide.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Oct 8, 2020
… more scenarios such as PartitioningCollection
### What changes were proposed in this pull request?
This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios:
1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`.
2. Handle `PartitioningCollection`, which may contain `HashPartitioning`
### Why are the changes needed?
1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side.
The following will not consider the right-side `HashPartitioning`:
```
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
join.explain
== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
: +- *(1) Project [i1#26, j1#27]
: +- *(1) Filter isnotnull(i1#26)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
+- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed
+- *(3) Project [i2#30, j2#31]
+- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. For the scenario 2), the current behavior does not handle `PartitioningCollection`:
```
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain
== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed
: +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed
: +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
: :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
: : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
: : +- *(1) LocalTableScan [_1#2, _2#3]
: +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
: +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
: +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
+- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
+- *(7) LocalTableScan [_1#24, _2#25]
```
### Does this PR introduce _any_ user-facing change?
Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed:
1. Senario 1):
```
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
: +- *(1) Project [i1#26, j1#27]
: +- *(1) Filter isnotnull(i1#26)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
+- *(3) Project [i2#30, j2#31]
+- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. Scenario 2):
```
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
: :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
: : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
: : +- *(1) LocalTableScan [_1#2, _2#3]
: +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
: +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
: +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
+- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
+- *(6) LocalTableScan [_1#24, _2#25]
```
### How was this patch tested?
Added tests.
Closes apache#29074 from imback82/reorder_keys.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Feb 25, 2021
… empty ### What changes were proposed in this pull request? This pr pushdown limit through InnerLike when condition is empty(Origin pr: apache#23104). For example: ```sql CREATE TABLE t1 using parquet AS SELECT id AS a, id AS b FROM range(2); CREATE TABLE t2 using parquet AS SELECT id AS d FROM range(2); SELECT * FROM t1 CROSS JOIN t2 LIMIT 10; ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- CollectLimit 10 +- BroadcastNestedLoopJoin BuildRight, Cross :- FileScan parquet default.t1[a#5L,b#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- BroadcastExchange IdentityBroadcastMode, [id=#43] +- FileScan parquet default.t2[d#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- CollectLimit 10 +- BroadcastNestedLoopJoin BuildRight, Cross :- LocalLimit 10 : +- FileScan parquet default.t1[a#5L,b#6L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- BroadcastExchange IdentityBroadcastMode, [id=#51] +- LocalLimit 10 +- FileScan parquet default.t2[d#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/tg/f5mz46090wg7swzgdc69f8q03965_0/T/warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<d:bigint> ``` ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#31567 from wangyum/SPARK-26138. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Yuming Wang <yumwang@ebay.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.