Skip to content

Conversation

@tejasapatil
Copy link
Contributor

What changes were proposed in this pull request?

Currently for bucketed and sorted tables, SORT MERGE JOIN doesn't use this table metadata to avoid the unnecessary operations (eg. Exchange and Sort). This PR adds that support.

  • Populated sort ordering in *DataSourceScanExec for bucketed tables
  • Fixed a bug related to comparing ordering in EnsureRequirements
val df1 = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k")
df1.write.format("orc").partitionBy("i").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table7")
df1.write.format("orc").partitionBy("i").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
hc.sql("SELECT * FROM table7 a JOIN table8 b ON a.j=b.j AND a.k=b.k AND a.i=b.i AND a.i=2 AND b.i=2").explain(true)

Before:

== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, Some((((('a.j = 'b.j) && ('a.k = 'b.k)) && ('a.i = 'b.i)) && (('a.i = 2) && ('b.i = 2))))
   :- 'UnresolvedRelation `table7`, Some(a)
   +- 'UnresolvedRelation `table8`, Some(b)

== Analyzed Logical Plan ==
j: int, k: string, i: int, j: int, k: string, i: int
Project [j#20,k#21,i#22,j#23,k#24,i#25]
+- Join Inner, Some(((((j#20 = j#23) && (k#21 = k#24)) && (i#22 = i#25)) && ((i#22 = 2) && (i#25 = 2))))
   :- SubqueryAlias a
   :  +- SubqueryAlias table7
   :     +- Relation[j#20,k#21,i#22] orc
   +- SubqueryAlias b
      +- SubqueryAlias table8
         +- Relation[j#23,k#24,i#25] orc

== Optimized Logical Plan ==
Join Inner, Some((((j#20 = j#23) && (k#21 = k#24)) && (i#22 = i#25)))
:- Filter (((isnotnull(k#21) && isnotnull(j#20)) && isnotnull(i#22)) && (i#22 = 2))
:  +- Relation[j#20,k#21,i#22] orc
+- Filter (((isnotnull(k#24) && isnotnull(j#23)) && isnotnull(i#25)) && (i#25 = 2))
   +- Relation[j#23,k#24,i#25] orc

== Physical Plan ==
WholeStageCodegen
:  +- SortMergeJoin [j#20,k#21,i#22], [j#23,k#24,i#25], Inner, None
:     :- INPUT
:     +- INPUT
:- WholeStageCodegen
:  :  +- Sort [j#20 ASC,k#21 ASC,i#22 ASC], false, 0
:  :     +- INPUT
:  +- Exchange hashpartitioning(j#20, k#21, i#22, 200), None
:     +- WholeStageCodegen
:        :  +- Project [j#20,k#21,i#22]
:        :     +- Filter (isnotnull(k#21) && isnotnull(j#20))
:        :        +- Scan orc default.table7[j#20,k#21,i#22] Format: ORC, InputPaths: file:/XXXX/table7, PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<j:int,k:string>
+- WholeStageCodegen
   :  +- Sort [j#23 ASC,k#24 ASC,i#25 ASC], false, 0
   :     +- INPUT
   +- Exchange hashpartitioning(j#23, k#24, i#25, 200), None
      +- WholeStageCodegen
         :  +- Project [j#23,k#24,i#25]
         :     +- Filter (isnotnull(k#24) && isnotnull(j#23))
         :        +- Scan orc default.table8[j#23,k#24,i#25] Format: ORC, InputPaths: file:/XXXX/table8, PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<j:int,k:string>

After

== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, Some((((('a.j = 'b.j) && ('a.k = 'b.k)) && ('a.i = 'b.i)) && (('a.i = 2) && ('b.i = 2))))
   :- 'UnresolvedRelation `table7`, Some(a)
   +- 'UnresolvedRelation `table8`, Some(b)

== Analyzed Logical Plan ==
j: int, k: string, i: int, j: int, k: string, i: int
Project [j#139,k#140,i#141,j#142,k#143,i#144]
+- Join Inner, Some(((((j#139 = j#142) && (k#140 = k#143)) && (i#141 = i#144)) && ((i#141 = 2) && (i#144 = 2))))
   :- SubqueryAlias a
   :  +- SubqueryAlias table7
   :     +- Relation[j#139,k#140,i#141] orc
   +- SubqueryAlias b
      +- SubqueryAlias table8
         +- Relation[j#142,k#143,i#144] orc

== Optimized Logical Plan ==
Join Inner, Some((((j#139 = j#142) && (k#140 = k#143)) && (i#141 = i#144)))
:- Filter (((isnotnull(k#140) && isnotnull(j#139)) && isnotnull(i#141)) && (i#141 = 2))
:  +- Relation[j#139,k#140,i#141] orc
+- Filter (((isnotnull(k#143) && isnotnull(j#142)) && isnotnull(i#144)) && (i#144 = 2))
   +- Relation[j#142,k#143,i#144] orc

== Physical Plan ==
WholeStageCodegen
:  +- SortMergeJoin [j#139,k#140,i#141], [j#142,k#143,i#144], Inner, None
:     :- INPUT
:     +- INPUT
:- WholeStageCodegen
:  :  +- Project [j#139,k#140,i#141]
:  :     +- Filter (isnotnull(k#140) && isnotnull(j#139))
:  :        +- Scan orc default.table7[j#139,k#140,i#141] Format: ORC, InputPaths: file:/XXXX/table7, PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<j:int,k:string>
+- WholeStageCodegen
   :  +- Project [j#142,k#143,i#144]
   :     +- Filter (isnotnull(k#143) && isnotnull(j#142))
   :        +- Scan orc default.table8[j#142,k#143,i#144] Format: ORC, InputPaths: file:/XXXX/table8, PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<j:int,k:string>

How was this patch tested?

I have tested for correctness with small data. In process of writing a test case.

@tejasapatil
Copy link
Contributor Author

ok to test

@tejasapatil tejasapatil changed the title [SPARK-15453] [SQL] SMB Join for datasource [SPARK-15453] [SQL] Sort Merge Join to use bucketing metadata to optimize query plan May 20, 2016
@SparkQA
Copy link

SparkQA commented May 20, 2016

Test build #59026 has finished for PR 13231 at commit 9e2bb18.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 21, 2016

This looks good at high level. I'd separate the bug fix from the feature improvement and submit a pr for the bug fix with a test case. We should merge that bug fix into 2.0.

The feature improvement should probably go into 2.1. In the future we can also introduce a merge sort operator that combines the multiple files in a single partition and then perform merge join without the extra full sort.

@tejasapatil
Copy link
Contributor Author

Will re-open when I am ready

@viirya
Copy link
Member

viirya commented Aug 26, 2016

@tejasapatil any chance to update it soon? If not, I am interested in implement it.

@tejasapatil
Copy link
Contributor Author

@viirya : I was spent some time on this today and got a working version : tejasapatil@a17b167

I need to polish it and verify that it does not cause any regressions

@viirya
Copy link
Member

viirya commented Aug 26, 2016

@tejasapatil ok. I will implement Filter "stop if false" function.

@tejasapatil
Copy link
Contributor Author

Continuing this work in a new PR : #14864

@sharmabhaskar
Copy link

sharmabhaskar commented Jun 12, 2019

@tejasapatil I am facing the same issue while joining to bucketed tables . I am using spark 2,2 in mapR distribution:
I have two tables :
table A : bucketed on key_column ( 20 buckets )
table B : portioned on year and bucketed on key_columns( 20 buckets )

But while joining both the tables on key_columns the query is doing both sort and exchange:
[count#1311L])
+- *Project
+- *SortMergeJoin [key_column#1079], [key_column#1218],Inner
sort step: :- *Sort [key_column#1079 ASC NULLS FIRST], false, 0
exchange step: : +- Exchange hashpartitioning(key_column#1079, 200)
: +- *Filter isnotnull(key_column#1079)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants