[SPARK-20961][SQL] generalize the dictionary in ColumnVector#68
Merged
GulajavaMinistudio merged 1 commit intoGulajavaMinistudio:masterfrom Jun 5, 2017
Merged
[SPARK-20961][SQL] generalize the dictionary in ColumnVector#68GulajavaMinistudio merged 1 commit intoGulajavaMinistudio:masterfrom
GulajavaMinistudio merged 1 commit intoGulajavaMinistudio:masterfrom
Conversation
## What changes were proposed in this pull request? As the first step of https://issues.apache.org/jira/browse/SPARK-20960 , to make `ColumnVector` public, this PR generalize `ColumnVector.dictionary` to not couple with parquet. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #18183 from cloud-fan/dictionary.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Mar 29, 2020
… coerce to nullable type ### What changes were proposed in this pull request? This PR targets for non-nullable null type not to coerce to nullable type in complex types. Non-nullable fields in struct, elements in an array and entries in map can mean empty array, struct and map. They are empty so it does not need to force the nullability when we find common types. This PR also reverts and supersedes apache@d7b97a1 ### Why are the changes needed? To make type coercion coherent and consistent. Currently, we correctly keep the nullability even between non-nullable fields: ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ spark.range(1).select(array(lit(1)).cast(ArrayType(IntegerType, false))).printSchema() spark.range(1).select(array(lit(1)).cast(ArrayType(DoubleType, false))).printSchema() ``` ```scala spark.range(1).selectExpr("concat(array(1), array(1)) as arr").printSchema() ``` ### Does this PR introduce any user-facing change? Yes. ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ spark.range(1).select(array().cast(ArrayType(IntegerType, false))).printSchema() ``` ```scala spark.range(1).selectExpr("concat(array(), array(1)) as arr").printSchema() ``` **Before:** ``` org.apache.spark.sql.AnalysisException: cannot resolve 'array()' due to data type mismatch: cannot cast array<null> to array<int>;; 'Project [cast(array() as array<int>) AS array()#68] +- Range (0, 1, step=1, splits=Some(12)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:149) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) ``` ``` root |-- arr: array (nullable = false) | |-- element: integer (containsNull = true) ``` **After:** ``` root |-- array(): array (nullable = false) | |-- element: integer (containsNull = false) ``` ``` root |-- arr: array (nullable = false) | |-- element: integer (containsNull = false) ``` ### How was this patch tested? Unittests were added and manually tested. Closes apache#27991 from HyukjinKwon/SPARK-31227. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Jan 15, 2021
…join can be planned as broadcast join
### What changes were proposed in this pull request?
Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.
```scala
spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1")
spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain
```
Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
:- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65]
: +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```
After this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
:- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
```
### Why are the changes needed?
1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance.
2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test and benchmark test.
SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
q14a | 660 | 594
q14b | 660 | 600
q38 | 55 | 29
q87 | 66 | 35
Before this pr:

After this pr:

Closes apache#31145 from wangyum/SPARK-34081.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Aug 4, 2021
…query
### What changes were proposed in this pull request?
Remove redundant aliases after `RewritePredicateSubquery`. For example:
```scala
sql("CREATE TABLE t1 USING parquet AS SELECT id AS a, id AS b, id AS c FROM range(10)")
sql("CREATE TABLE t2 USING parquet AS SELECT id AS x, id AS y FROM range(8)")
sql(
"""
|SELECT *
|FROM t1
|WHERE a IN (SELECT x
| FROM (SELECT x AS x,
| Rank() OVER (partition BY x ORDER BY Sum(y) DESC) AS ranking
| FROM t2
| GROUP BY x) tmp1
| WHERE ranking <= 5)
|""".stripMargin).explain
```
Before this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#7L], LeftSemi, BuildRight, false
:- FileScan parquet default.t1[a#10L,b#11L,c#12L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#68]
+- Project [x#7L]
+- Filter (ranking#8 <= 5)
+- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
+- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#62]
+- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
+- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#59]
+- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
+- FileScan parquet default.t2[x#15L,y#16L]
```
After this PR:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [a#10L], [x#15L], LeftSemi, BuildRight, false
:- FileScan parquet default.t1[a#10L,b#11L,c#12L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#67]
+- Project [x#15L]
+- Filter (ranking#8 <= 5)
+- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST]
+- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0
+- HashAggregate(keys=[x#15L], functions=[sum(y#16L)])
+- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=#59]
+- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)])
+- FileScan parquet default.t2[x#15L,y#16L]
```
### Why are the changes needed?
Reduce shuffle to improve query performance. This change can benefit TPC-DS q70.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes apache#33509 from wangyum/SPARK-36280.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
As the first step of https://issues.apache.org/jira/browse/SPARK-20960 , to make
ColumnVectorpublic, this PR generalizeColumnVector.dictionaryto not couple with parquet.How was this patch tested?
existing tests
Author: Wenchen Fan wenchen@databricks.com
Closes apache#18183 from cloud-fan/dictionary.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.