-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25121][SQL] Supports multi-part relation names for join strategy hint resolution #27935
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Not ready for reviews. |
|
Test build #119924 has finished for PR 27935 at commit
|
b276c1c to
9a0c68c
Compare
|
Test build #119929 has finished for PR 27935 at commit
|
|
Test build #119931 has finished for PR 27935 at commit
|
| // For example, in a query `SELECT /* BROADCAST(default.t) */ * FROM default.t JOIN t`, | ||
| // the broadcast hint will match the left-side table only, `default.t`. | ||
| // | ||
| // 3. otherwise, no match happens. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-read your comments again in #22198 and summarized up them above. If I misunderstand something, please let me know. @dongjoon-hyun @cloud-fan
|
Test build #119963 has finished for PR 27935 at commit
|
| // | ||
| // 1. they match if an identifier in a hint only has one part and it is the same with | ||
| // a relation name in a query. If a relation has a namespace (`db1.t`), we just ignore it. | ||
| // For example, in a query `SELECT /* BROADCAST(t) */ * FROM db1.t JOIN t`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the existing behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @maryannxue as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I checked queries below in v2.4.5;
// v2.4.5
scala> sql("CREATE DATABASE db1")
scala> sql("CREATE TABLE db1.t(key int)")
scala> sql("CREATE TABLE t(key int)")
scala> sql("""SELECT /*+ MAPJOIN(t) */ * FROM db1.t JOIN t""")
== Parsed Logical Plan ==
'UnresolvedHint MAPJOIN, ['t]
+- 'Project [*]
+- 'Join Inner
:- 'UnresolvedRelation `db1`.`t`
+- 'UnresolvedRelation `t`
== Analyzed Logical Plan ==
key: int, key: int
Project [key#20, key#21]
+- Join Inner
:- ResolvedHint (broadcast)
: +- SubqueryAlias `db1`.`t`
: +- HiveTableRelation `db1`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#20]
+- ResolvedHint (broadcast)
+- SubqueryAlias `default`.`t`
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#21]
| // For example, in a query `SELECT /*+ BROADCAST(t) */ * FROM db1.t JOIN t`, | ||
| // the broadcast hint will match both tables, `db1.t` and `t`. | ||
| // | ||
| // 2. they match if an identifier in a hint has two parts and it is the same with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the identifier has more than 2 parts like cata.ns1.ns2.tbl ? How about we define a simple rule: If identInHint is a tail of identInQuery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that looks nice. I'll update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the latest update?
|
Test build #119987 has finished for PR 27935 at commit
|
|
Test build #120012 has finished for PR 27935 at commit
|
|
Test build #120011 has finished for PR 27935 at commit
|
|
Test build #120013 has finished for PR 27935 at commit
|
|
|
||
| package org.apache.spark.sql.catalyst | ||
|
|
||
| trait BaseIdentifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we add a base trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, we can remove it now. I'll update.
| plan: LogicalPlan, | ||
| relations: mutable.HashSet[String], | ||
| relationsInHint: Seq[Seq[String]], | ||
| appliedRelations: mutable.ArrayBuffer[Seq[String]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't care which relations are matched, but which relation name specified by hint does not have a match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about relationsInHintWithMatch: mutable.HashSet[Seq[String]]
and in code
relationsInHint.find(matchedIdentifier(_, ident)).map { relation =>
relationsInHintWithMatch += relation
plan with hint applied
}.getOrElse {
originalPlan
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, without this variable, how do we track non-used hints for the error report in hintErrorHandler.hintRelationsNotFound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated in the latest commit.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
Show resolved
Hide resolved
| if relations.exists(resolver(_, ident.last)) => | ||
| relations.remove(ident.last) | ||
| if relationsInHint.exists(matchedIdentifier(_, ident)) => | ||
| relationsInHintWithMatch += ident |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is, ident is the actual relation name, not the relation name in the hint. This forces us to do an extra case insensitive match in https://github.com/apache/spark/pull/27935/files#diff-746a6d090224c7cfbe15daa27fa27408R163
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Urr, I see. I'll fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since your suggested code above broke some existing tests, I modified it a little based on that. Updated in the latest commit.
| } | ||
| } | ||
|
|
||
| test("broadcast hint on temp view") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this test suite is only for permanent views. Maybe we can put everything in one test in https://github.com/apache/spark/pull/27935/files#diff-fa1d044f9cfe587e27866393fe18fd46R329
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this test into DataFrameJoinSuite.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one comment
|
Test build #120043 has finished for PR 27935 at commit
|
|
Test build #120052 has finished for PR 27935 at commit
|
|
Test build #120056 has finished for PR 27935 at commit
|
| val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " + | ||
| s"WHERE $table1Name.id = $table2Name.id") | ||
| .queryExecution.executedPlan | ||
| assert(plan.collect { case p: BroadcastHashJoinExec => p }.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkIfHintNotApplied?
- .queryExecution.executedPlan
- assert(plan.collect { case p: BroadcastHashJoinExec => p }.isEmpty)
+ checkIfHintNotApplied(plan)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah... I forgot to remove the old tests... I can remove it. Thanks!
| checkIfHintApplied(sqlTemplate(s"$dbName.$table1Name", s"$dbName.$table1Name")) | ||
| checkIfHintApplied(sqlTemplate(s"$dbName.$table1Name", table1Name)) | ||
| checkIfHintNotApplied(sqlTemplate(table1Name, s"$dbName.$table1Name")) | ||
| checkIfHintNotApplied(sqlTemplate(s"$dbName.$table1Name", s"$dbName.$table1Name.id")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is $dbName.$table1Name.id used as a negative value for hintTableName? It's a little confusing because there is a catalog concept. For three fields, can we use catalog instead of id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will modify id -> spark_catalog.
| withTempView("tv") { | ||
| sql(s"CREATE TEMPORARY VIEW tv AS SELECT * FROM $dbName.$table1Name") | ||
| checkIfHintApplied(sqlTemplate("tv", "tv")) | ||
| checkIfHintNotApplied(sqlTemplate("tv", "default.tv")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be misleading. Technically, this is the same with sqlTemplate("tv", "non_exist") because we cannot use a database qualifier for Temporary View.
scala> sql("select * from default.tv").show
org.apache.spark.sql.AnalysisException: Table or view not found: default.tv; line 1 pos 14;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, but I think a query with a hint having a non-existent relation identifier should work?;
scala> sql("create table t1 (id int)")
scala> sql("create table t2 (id int)")
scala> sql("create temporary view tv as select * from t1")
scala> sql("SELECT /*+ BROADCASTJOIN(default.non_exist) */ * FROM tv, t2 WHERE tv.id = t2.id").explain(true)
20/03/20 07:34:02 WARN HintErrorLogger: Count not find relation 'default.non_exist' specified in hint 'BROADCASTJOIN(default.non_exist)'.
== Parsed Logical Plan ==
'UnresolvedHint BROADCASTJOIN, ['default.non_exist]
+- 'Project [*]
+- 'Filter ('tv.id = 't2.id)
+- 'Join Inner
:- 'UnresolvedRelation [tv]
+- 'UnresolvedRelation [t2]
== Analyzed Logical Plan ==
id: int, id: int
Project [id#0, id#1]
+- Filter (id#0 = id#1)
+- Join Inner
:- SubqueryAlias tv
: +- Project [id#0]
: +- SubqueryAlias spark_catalog.default.t1
: +- Relation[id#0] parquet
+- SubqueryAlias spark_catalog.default.t2
+- Relation[id#1] parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will modify the tests a little.
| // | ||
| // For example, | ||
| // * in a query `SELECT /*+ BROADCAST(t) */ * FROM db1.t JOIN t`, | ||
| // the broadcast hint will match both tables, `db1.t` and `t`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- the broadcast hint will match both tables, `db1.t` and `t`.
+ the broadcast hint will match both tables, `db1.t` and `t`, even when the current db is `db2`.
| // local temp table (single-part identifier case) | ||
| checkAnalysis( | ||
| UnresolvedHint("MAPJOIN", Seq("table", "table2"), | ||
| table("table").join(table("table2"))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following will be better because this is a caseSensitive = false test case.
- table("table").join(table("table2"))),
+ table("TaBlE").join(table("TaBlE2"))),There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, yes!
|
Thanks for the reviews, @dongjoon-hyun! I've updated, so could you check the latest commit? |
| val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " + | ||
| s"WHERE $table1Name.id = $table2Name.id") | ||
| .queryExecution.executedPlan | ||
| assert(plan.collect { case p: BroadcastHashJoinExec => p }.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is this pre-testing removed completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I saw that your comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think the current test coverage is good enough.
|
Test build #120074 has finished for PR 27935 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
|
Merged to master/3.0. Thank you, @maropu and @cloud-fan . |
… resolution
### What changes were proposed in this pull request?
This pr fixed code to respect a database name for broadcast table hint resolution.
Currently, spark ignores a database name in multi-part names;
```
scala> sql("CREATE DATABASE testDb")
scala> spark.range(10).write.saveAsTable("testDb.t")
// without this patch
scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain
== Physical Plan ==
*(2) Project [id#24L]
+- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- *(1) Range (0, 10, step=1, splits=4)
+- *(2) Project [id#26L]
+- *(2) Filter isnotnull(id#26L)
+- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
// with this patch
scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain
== Physical Plan ==
*(2) Project [id#3L]
+- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight
:- *(2) Range (0, 10, step=1, splits=4)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [id#5L]
+- *(1) Filter isnotnull(id#5L)
+- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
```
This PR comes from #22198
### Why are the changes needed?
For better usability.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes #27935 from maropu/SPARK-25121-2.
Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit ca499e9)
Signed-off-by: Dongjoon Hyun <[email protected]>
|
Thanks for the reviews, @dongjoon-hyun @cloud-fan ! |
|
@maropu Could you fix the title and PR description? Add the test cases for the other join hints? Thanks! |
|
Sure, I'll do that. Thanks! |
…ifiers in join strategy hints ### What changes were proposed in this pull request? This pr intends to add unit tests for the other join hints (`MERGEJOIN`, `SHUFFLE_HASH`, and `SHUFFLE_REPLICATE_NL`). This is a followup PR of #27935. ### Why are the changes needed? For better test coverage. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added unit tests. Closes #28013 from maropu/SPARK-25121-FOLLOWUP. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ifiers in join strategy hints ### What changes were proposed in this pull request? This pr intends to add unit tests for the other join hints (`MERGEJOIN`, `SHUFFLE_HASH`, and `SHUFFLE_REPLICATE_NL`). This is a followup PR of #27935. ### Why are the changes needed? For better test coverage. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added unit tests. Closes #28013 from maropu/SPARK-25121-FOLLOWUP. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit da49f50) Signed-off-by: Dongjoon Hyun <[email protected]>
… resolution
### What changes were proposed in this pull request?
This pr fixed code to respect a database name for broadcast table hint resolution.
Currently, spark ignores a database name in multi-part names;
```
scala> sql("CREATE DATABASE testDb")
scala> spark.range(10).write.saveAsTable("testDb.t")
// without this patch
scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain
== Physical Plan ==
*(2) Project [id#24L]
+- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- *(1) Range (0, 10, step=1, splits=4)
+- *(2) Project [id#26L]
+- *(2) Filter isnotnull(id#26L)
+- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
// with this patch
scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain
== Physical Plan ==
*(2) Project [id#3L]
+- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight
:- *(2) Range (0, 10, step=1, splits=4)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [id#5L]
+- *(1) Filter isnotnull(id#5L)
+- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
```
This PR comes from apache#22198
### Why are the changes needed?
For better usability.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes apache#27935 from maropu/SPARK-25121-2.
Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…ifiers in join strategy hints ### What changes were proposed in this pull request? This pr intends to add unit tests for the other join hints (`MERGEJOIN`, `SHUFFLE_HASH`, and `SHUFFLE_REPLICATE_NL`). This is a followup PR of apache#27935. ### Why are the changes needed? For better test coverage. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added unit tests. Closes apache#28013 from maropu/SPARK-25121-FOLLOWUP. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This pr fixed code for respecting a multi-part identifier (e.g.,
dbname.tablename) for join strategy hint resolution. For example, the master ignores a database name in a hint parameter;This PR comes from #22198
Why are the changes needed?
For better usability.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit tests.