Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,11 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
relativeRows * conf.joinReorderCardWeight +
relativeSize * (1 - conf.joinReorderCardWeight) < 1
}
val thisCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
val otherCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
thisCost < otherCost
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.optimizer.JoinReorderDP.JoinPlan
import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -363,4 +364,18 @@ class JoinReorderSuite extends JoinReorderPlanTestBase with StatsEstimationTestB

assertEqualJoinPlans(Optimize, originalPlan3, bestPlan3)
}

test("SPARK-33935: betterThan should be consistent") {
val plan1 = JoinPlan(null, null, null, Cost(300, 80))
val plan2 = JoinPlan(null, null, null, Cost(500, 30))

// cost1 = 300*0.7 + 80*0.3 = 234
// cost2 = 500*0.7 + 30*0.3 = 359

assert(!plan1.betterThan(plan1, conf))
assert(!plan2.betterThan(plan2, conf))

assert(plan1.betterThan(plan2, conf))
assert(!plan2.betterThan(plan1, conf))
Comment on lines +378 to +379
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. This fix looks fine to me. cc: @cloud-fan @HyukjinKwon

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ class StarJoinCostBasedReorderSuite extends JoinReorderPlanTestBase with StatsEs
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))

val expected =
f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
.join(f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
.select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)

assertEqualJoinPlans(Optimize, query, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
+- * HashAggregate (36)
+- * Project (35)
+- * BroadcastHashJoin Inner BuildRight (34)
:- * Project (28)
: +- * BroadcastHashJoin Inner BuildRight (27)
:- * Project (29)
: +- * BroadcastHashJoin Inner BuildRight (28)
: :- * Project (22)
: : +- * BroadcastHashJoin Inner BuildRight (21)
: : :- * Project (15)
Expand All @@ -27,16 +27,16 @@
: : +- * Project (19)
: : +- * Filter (18)
: : +- * ColumnarToRow (17)
: : +- Scan parquet default.date_dim (16)
: +- BroadcastExchange (26)
: +- * Filter (25)
: +- * ColumnarToRow (24)
: +- Scan parquet default.store (23)
: : +- Scan parquet default.customer_address (16)
: +- BroadcastExchange (27)
: +- * Project (26)
: +- * Filter (25)
: +- * ColumnarToRow (24)
: +- Scan parquet default.date_dim (23)
+- BroadcastExchange (33)
+- * Project (32)
+- * Filter (31)
+- * ColumnarToRow (30)
+- Scan parquet default.customer_address (29)
+- * Filter (32)
+- * ColumnarToRow (31)
+- Scan parquet default.store (30)


(1) Scan parquet default.store_sales
Expand Down Expand Up @@ -107,94 +107,94 @@ Join condition: (((((((cd_marital_status#12 = M) AND (cd_education_status#13 = A
Output [7]: [ss_sold_date_sk#1, ss_addr_sk#4, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10]
Input [13]: [ss_sold_date_sk#1, ss_hdemo_sk#3, ss_addr_sk#4, ss_store_sk#5, ss_quantity#6, ss_sales_price#7, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10, cd_marital_status#12, cd_education_status#13, hd_demo_sk#15, hd_dep_count#16]

(16) Scan parquet default.date_dim
Output [2]: [d_date_sk#18, d_year#19]
(16) Scan parquet default.customer_address
Output [3]: [ca_address_sk#18, ca_state#19, ca_country#20]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>
Location [not included in comparison]/{warehouse_dir}/customer_address]
PushedFilters: [IsNotNull(ca_country), EqualTo(ca_country,United States), IsNotNull(ca_address_sk), Or(Or(In(ca_state, [TX,OH]),In(ca_state, [OR,NM,KY])),In(ca_state, [VA,TX,MS]))]
ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>

(17) ColumnarToRow [codegen id : 3]
Input [2]: [d_date_sk#18, d_year#19]
Input [3]: [ca_address_sk#18, ca_state#19, ca_country#20]

(18) Filter [codegen id : 3]
Input [2]: [d_date_sk#18, d_year#19]
Condition : ((isnotnull(d_year#19) AND (d_year#19 = 2001)) AND isnotnull(d_date_sk#18))
Input [3]: [ca_address_sk#18, ca_state#19, ca_country#20]
Condition : (((isnotnull(ca_country#20) AND (ca_country#20 = United States)) AND isnotnull(ca_address_sk#18)) AND ((ca_state#19 IN (TX,OH) OR ca_state#19 IN (OR,NM,KY)) OR ca_state#19 IN (VA,TX,MS)))

(19) Project [codegen id : 3]
Output [1]: [d_date_sk#18]
Input [2]: [d_date_sk#18, d_year#19]
Output [2]: [ca_address_sk#18, ca_state#19]
Input [3]: [ca_address_sk#18, ca_state#19, ca_country#20]

(20) BroadcastExchange
Input [1]: [d_date_sk#18]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#20]
Input [2]: [ca_address_sk#18, ca_state#19]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#21]

(21) BroadcastHashJoin [codegen id : 6]
Left keys [1]: [ss_sold_date_sk#1]
Right keys [1]: [d_date_sk#18]
Join condition: None
Left keys [1]: [ss_addr_sk#4]
Right keys [1]: [ca_address_sk#18]
Join condition: ((((ca_state#19 IN (TX,OH) AND (ss_net_profit#10 >= 100.00)) AND (ss_net_profit#10 <= 200.00)) OR ((ca_state#19 IN (OR,NM,KY) AND (ss_net_profit#10 >= 150.00)) AND (ss_net_profit#10 <= 300.00))) OR ((ca_state#19 IN (VA,TX,MS) AND (ss_net_profit#10 >= 50.00)) AND (ss_net_profit#10 <= 250.00)))

(22) Project [codegen id : 6]
Output [6]: [ss_addr_sk#4, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10]
Input [8]: [ss_sold_date_sk#1, ss_addr_sk#4, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10, d_date_sk#18]
Output [5]: [ss_sold_date_sk#1, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9]
Input [9]: [ss_sold_date_sk#1, ss_addr_sk#4, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10, ca_address_sk#18, ca_state#19]

(23) Scan parquet default.store
Output [1]: [s_store_sk#21]
(23) Scan parquet default.date_dim
Output [2]: [d_date_sk#22, d_year#23]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store]
PushedFilters: [IsNotNull(s_store_sk)]
ReadSchema: struct<s_store_sk:int>
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

(24) ColumnarToRow [codegen id : 4]
Input [1]: [s_store_sk#21]
Input [2]: [d_date_sk#22, d_year#23]

(25) Filter [codegen id : 4]
Input [1]: [s_store_sk#21]
Condition : isnotnull(s_store_sk#21)
Input [2]: [d_date_sk#22, d_year#23]
Condition : ((isnotnull(d_year#23) AND (d_year#23 = 2001)) AND isnotnull(d_date_sk#22))

(26) BroadcastExchange
Input [1]: [s_store_sk#21]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#22]
(26) Project [codegen id : 4]
Output [1]: [d_date_sk#22]
Input [2]: [d_date_sk#22, d_year#23]

(27) BroadcastHashJoin [codegen id : 6]
Left keys [1]: [ss_store_sk#5]
Right keys [1]: [s_store_sk#21]
(27) BroadcastExchange
Input [1]: [d_date_sk#22]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#24]

(28) BroadcastHashJoin [codegen id : 6]
Left keys [1]: [ss_sold_date_sk#1]
Right keys [1]: [d_date_sk#22]
Join condition: None

(28) Project [codegen id : 6]
Output [5]: [ss_addr_sk#4, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10]
Input [7]: [ss_addr_sk#4, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10, s_store_sk#21]
(29) Project [codegen id : 6]
Output [4]: [ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9]
Input [6]: [ss_sold_date_sk#1, ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, d_date_sk#22]

(29) Scan parquet default.customer_address
Output [3]: [ca_address_sk#23, ca_state#24, ca_country#25]
(30) Scan parquet default.store
Output [1]: [s_store_sk#25]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer_address]
PushedFilters: [IsNotNull(ca_country), EqualTo(ca_country,United States), IsNotNull(ca_address_sk), Or(Or(In(ca_state, [TX,OH]),In(ca_state, [OR,NM,KY])),In(ca_state, [VA,TX,MS]))]
ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>

(30) ColumnarToRow [codegen id : 5]
Input [3]: [ca_address_sk#23, ca_state#24, ca_country#25]
Location [not included in comparison]/{warehouse_dir}/store]
PushedFilters: [IsNotNull(s_store_sk)]
ReadSchema: struct<s_store_sk:int>

(31) Filter [codegen id : 5]
Input [3]: [ca_address_sk#23, ca_state#24, ca_country#25]
Condition : (((isnotnull(ca_country#25) AND (ca_country#25 = United States)) AND isnotnull(ca_address_sk#23)) AND ((ca_state#24 IN (TX,OH) OR ca_state#24 IN (OR,NM,KY)) OR ca_state#24 IN (VA,TX,MS)))
(31) ColumnarToRow [codegen id : 5]
Input [1]: [s_store_sk#25]

(32) Project [codegen id : 5]
Output [2]: [ca_address_sk#23, ca_state#24]
Input [3]: [ca_address_sk#23, ca_state#24, ca_country#25]
(32) Filter [codegen id : 5]
Input [1]: [s_store_sk#25]
Condition : isnotnull(s_store_sk#25)

(33) BroadcastExchange
Input [2]: [ca_address_sk#23, ca_state#24]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#26]
Input [1]: [s_store_sk#25]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#26]

(34) BroadcastHashJoin [codegen id : 6]
Left keys [1]: [ss_addr_sk#4]
Right keys [1]: [ca_address_sk#23]
Join condition: ((((ca_state#24 IN (TX,OH) AND (ss_net_profit#10 >= 100.00)) AND (ss_net_profit#10 <= 200.00)) OR ((ca_state#24 IN (OR,NM,KY) AND (ss_net_profit#10 >= 150.00)) AND (ss_net_profit#10 <= 300.00))) OR ((ca_state#24 IN (VA,TX,MS) AND (ss_net_profit#10 >= 50.00)) AND (ss_net_profit#10 <= 250.00)))
Left keys [1]: [ss_store_sk#5]
Right keys [1]: [s_store_sk#25]
Join condition: None

(35) Project [codegen id : 6]
Output [3]: [ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9]
Input [7]: [ss_addr_sk#4, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, ss_net_profit#10, ca_address_sk#23, ca_state#24]
Input [5]: [ss_store_sk#5, ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9, s_store_sk#25]

(36) HashAggregate [codegen id : 6]
Input [3]: [ss_quantity#6, ss_ext_sales_price#8, ss_ext_wholesale_cost#9]
Expand All @@ -205,7 +205,7 @@ Results [7]: [sum#34, count#35, sum#36, count#37, sum#38, count#39, sum#40]

(37) Exchange
Input [7]: [sum#34, count#35, sum#36, count#37, sum#38, count#39, sum#40]
Arguments: SinglePartition, true, [id=#41]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#41]

(38) HashAggregate [codegen id : 7]
Input [7]: [sum#34, count#35, sum#36, count#37, sum#38, count#39, sum#40]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ WholeStageCodegen (7)
WholeStageCodegen (6)
HashAggregate [ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost] [sum,count,sum,count,sum,count,sum,sum,count,sum,count,sum,count,sum]
Project [ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost]
BroadcastHashJoin [ss_addr_sk,ca_address_sk,ca_state,ss_net_profit]
Project [ss_addr_sk,ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost,ss_net_profit]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [ss_addr_sk,ss_store_sk,ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost,ss_net_profit]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [ss_store_sk,ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Project [ss_sold_date_sk,ss_store_sk,ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost]
BroadcastHashJoin [ss_addr_sk,ca_address_sk,ca_state,ss_net_profit]
Project [ss_sold_date_sk,ss_addr_sk,ss_store_sk,ss_quantity,ss_ext_sales_price,ss_ext_wholesale_cost,ss_net_profit]
BroadcastHashJoin [ss_hdemo_sk,hd_demo_sk,cd_marital_status,cd_education_status,ss_sales_price,hd_dep_count]
Project [ss_sold_date_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_quantity,ss_sales_price,ss_ext_sales_price,ss_ext_wholesale_cost,ss_net_profit,cd_marital_status,cd_education_status]
Expand All @@ -35,23 +35,23 @@ WholeStageCodegen (7)
InputAdapter
BroadcastExchange #4
WholeStageCodegen (3)
Project [d_date_sk]
Filter [d_year,d_date_sk]
Project [ca_address_sk,ca_state]
Filter [ca_country,ca_address_sk,ca_state]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_year]
Scan parquet default.customer_address [ca_address_sk,ca_state,ca_country]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (4)
Filter [s_store_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store [s_store_sk]
Project [d_date_sk]
Filter [d_year,d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_year]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (5)
Project [ca_address_sk,ca_state]
Filter [ca_country,ca_address_sk,ca_state]
ColumnarToRow
InputAdapter
Scan parquet default.customer_address [ca_address_sk,ca_state,ca_country]
Filter [s_store_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store [s_store_sk]
Loading