Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,30 @@ object JoinReorderDP extends PredicateHelper with Logging {
}
}

/**
* To identify the plan with smaller computational cost,
* we use the weighted geometric mean of ratio of rows and the ratio of sizes in bytes.
*
* There are other ways to combine these values as a cost comparison function.
* Some of these, that we have experimented with, but have gotten worse result,
* than with the current one:
* 1) Weighted arithmetic mean of these two ratios - adding up fractions puts
* less emphasis on ratios between 0 and 1. Ratios 10 and 0.1 should be considered
* to be just as strong evidences in opposite directions. The arithmetic mean of these
* would be heavily biased towards the 10.
* 2) Absolute cost (cost = weight * rowCount + (1 - weight) * size) - when adding up
* two numeric measurements that have different units we can easily end up with one
* overwhelming the other.
*/
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
val thisCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
val otherCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
thisCost < otherCost
if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about leaving some comments about why we need to use relative values here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments to this method.

val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
Math.pow(relativeRows.doubleValue, conf.joinReorderCardWeight) *
Math.pow(relativeSize.doubleValue, 1 - conf.joinReorderCardWeight) < 1
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1926,8 +1926,10 @@ object SQLConf {
val JOIN_REORDER_CARD_WEIGHT =
buildConf("spark.sql.cbo.joinReorder.card.weight")
.internal()
.doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " +
"rows * weight + size * (1 - weight).")
.doc("The weight of the ratio of cardinalities (number of rows) " +
"in the cost comparison function. The ratio of sizes in bytes has weight " +
"1 - this value. The weighted geometric mean of these ratios is used to decide " +
"which of the candidate plans will be chosen by the CBO.")
.version("2.2.0")
.doubleConf
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,6 @@ class JoinReorderSuite extends JoinReorderPlanTestBase with StatsEstimationTestB
val plan1 = JoinPlan(null, null, null, Cost(300, 80))
val plan2 = JoinPlan(null, null, null, Cost(500, 30))

// cost1 = 300*0.7 + 80*0.3 = 234
// cost2 = 500*0.7 + 30*0.3 = 359

assert(!plan1.betterThan(plan1, conf))
assert(!plan2.betterThan(plan2, conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,13 @@ class StarJoinCostBasedReorderSuite extends JoinReorderPlanTestBase with StatsEs
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))

val expected =
t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
Some(nameToAttr("d1_c2") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
.join(f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
.select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)

assertEqualJoinPlans(Optimize, query, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
: : : +- * Project (8)
: : : +- * Filter (7)
: : : +- * ColumnarToRow (6)
: : : +- Scan parquet default.household_demographics (5)
: : : +- Scan parquet default.date_dim (5)
: : +- * Project (15)
: : +- * Filter (14)
: : +- * ColumnarToRow (13)
: : +- Scan parquet default.store (12)
: +- * Project (22)
: +- * Filter (21)
: +- * ColumnarToRow (20)
: +- Scan parquet default.date_dim (19)
: +- Scan parquet default.household_demographics (19)
+- * Filter (32)
+- * ColumnarToRow (31)
+- Scan parquet default.customer (30)
Expand All @@ -54,96 +54,96 @@ Condition : ((isnotnull(ss_store_sk#3) AND isnotnull(ss_hdemo_sk#2)) AND isnotnu

(4) BroadcastExchange
Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4, ss_sold_date_sk#5]
Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [id=#6]
Arguments: HashedRelationBroadcastMode(List(cast(input[4, int, true] as bigint)),false), [id=#6]

(5) Scan parquet default.household_demographics
Output [4]: [hd_demo_sk#7, hd_buy_potential#8, hd_dep_count#9, hd_vehicle_count#10]
(5) Scan parquet default.date_dim
Output [3]: [d_date_sk#7, d_year#8, d_dom#9]
Batched: true
Location [not included in comparison]/{warehouse_dir}/household_demographics]
PushedFilters: [IsNotNull(hd_vehicle_count), IsNotNull(hd_dep_count), Or(EqualTo(hd_buy_potential,>10000 ),EqualTo(hd_buy_potential,Unknown )), GreaterThan(hd_vehicle_count,0), IsNotNull(hd_demo_sk)]
ReadSchema: struct<hd_demo_sk:int,hd_buy_potential:string,hd_dep_count:int,hd_vehicle_count:int>
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_dom), GreaterThanOrEqual(d_dom,1), LessThanOrEqual(d_dom,2), In(d_year, [1998,1999,2000]), In(d_date_sk, [2451790,2451119,2451180,2451454,2450874,2450906,2450967,2451485,2451850,2451514,2451270,2451758,2451028,2451546,2450997,2450996,2451393,2451667,2451453,2451819,2450905,2451331,2451577,2451089,2451301,2451545,2451605,2451851,2451181,2451149,2451820,2451362,2451392,2451240,2450935,2451637,2451484,2451058,2451300,2451727,2451759,2450815,2451698,2451150,2451332,2451606,2451666,2451211,2450846,2450875,2450966,2450936,2451361,2451212,2451880,2451059,2451789,2451423,2451576,2450816,2451088,2451728,2451027,2451120,2451881,2451697,2450847,2451271,2451636,2451515,2451424,2451239]), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int,d_dom:int>

(6) ColumnarToRow
Input [4]: [hd_demo_sk#7, hd_buy_potential#8, hd_dep_count#9, hd_vehicle_count#10]
Input [3]: [d_date_sk#7, d_year#8, d_dom#9]

(7) Filter
Input [4]: [hd_demo_sk#7, hd_buy_potential#8, hd_dep_count#9, hd_vehicle_count#10]
Condition : (((((isnotnull(hd_vehicle_count#10) AND isnotnull(hd_dep_count#9)) AND ((hd_buy_potential#8 = >10000 ) OR (hd_buy_potential#8 = Unknown ))) AND (hd_vehicle_count#10 > 0)) AND ((cast(hd_dep_count#9 as double) / cast(hd_vehicle_count#10 as double)) > 1.0)) AND isnotnull(hd_demo_sk#7))
Input [3]: [d_date_sk#7, d_year#8, d_dom#9]
Condition : (((((isnotnull(d_dom#9) AND (d_dom#9 >= 1)) AND (d_dom#9 <= 2)) AND d_year#8 IN (1998,1999,2000)) AND d_date_sk#7 INSET (2451790,2451119,2451180,2451454,2450874,2450906,2450967,2451485,2451850,2451514,2451270,2451758,2451028,2451546,2450997,2450996,2451393,2451667,2451453,2451819,2450905,2451331,2451577,2451089,2451301,2451545,2451605,2451851,2451181,2451149,2451820,2451362,2451392,2451240,2450935,2451637,2451484,2451058,2451300,2451727,2451759,2450815,2451698,2451150,2451332,2451606,2451666,2451211,2450846,2450875,2450966,2450936,2451361,2451212,2451880,2451059,2451789,2451423,2451576,2450816,2451088,2451728,2451027,2451120,2451881,2451697,2450847,2451271,2451636,2451515,2451424,2451239)) AND isnotnull(d_date_sk#7))

(8) Project
Output [1]: [hd_demo_sk#7]
Input [4]: [hd_demo_sk#7, hd_buy_potential#8, hd_dep_count#9, hd_vehicle_count#10]
Output [1]: [d_date_sk#7]
Input [3]: [d_date_sk#7, d_year#8, d_dom#9]

(9) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [ss_hdemo_sk#2]
Right keys [1]: [hd_demo_sk#7]
Left keys [1]: [ss_sold_date_sk#5]
Right keys [1]: [d_date_sk#7]
Join condition: None

(10) Project [codegen id : 2]
Output [4]: [ss_customer_sk#1, ss_store_sk#3, ss_ticket_number#4, ss_sold_date_sk#5]
Input [6]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4, ss_sold_date_sk#5, hd_demo_sk#7]
Output [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4]
Input [6]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4, ss_sold_date_sk#5, d_date_sk#7]

(11) BroadcastExchange
Input [4]: [ss_customer_sk#1, ss_store_sk#3, ss_ticket_number#4, ss_sold_date_sk#5]
Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)),false), [id=#11]
Input [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4]
Arguments: HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint)),false), [id=#10]

(12) Scan parquet default.store
Output [2]: [s_store_sk#12, s_county#13]
Output [2]: [s_store_sk#11, s_county#12]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store]
PushedFilters: [In(s_county, [Fairfield County,Ziebach County,Bronx County,Barrow County]), IsNotNull(s_store_sk)]
ReadSchema: struct<s_store_sk:int,s_county:string>

(13) ColumnarToRow
Input [2]: [s_store_sk#12, s_county#13]
Input [2]: [s_store_sk#11, s_county#12]

(14) Filter
Input [2]: [s_store_sk#12, s_county#13]
Condition : (s_county#13 IN (Fairfield County,Ziebach County,Bronx County,Barrow County) AND isnotnull(s_store_sk#12))
Input [2]: [s_store_sk#11, s_county#12]
Condition : (s_county#12 IN (Fairfield County,Ziebach County,Bronx County,Barrow County) AND isnotnull(s_store_sk#11))

(15) Project
Output [1]: [s_store_sk#12]
Input [2]: [s_store_sk#12, s_county#13]
Output [1]: [s_store_sk#11]
Input [2]: [s_store_sk#11, s_county#12]

(16) BroadcastHashJoin [codegen id : 3]
Left keys [1]: [ss_store_sk#3]
Right keys [1]: [s_store_sk#12]
Right keys [1]: [s_store_sk#11]
Join condition: None

(17) Project [codegen id : 3]
Output [3]: [ss_customer_sk#1, ss_ticket_number#4, ss_sold_date_sk#5]
Input [5]: [ss_customer_sk#1, ss_store_sk#3, ss_ticket_number#4, ss_sold_date_sk#5, s_store_sk#12]
Output [3]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4]
Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4, s_store_sk#11]

(18) BroadcastExchange
Input [3]: [ss_customer_sk#1, ss_ticket_number#4, ss_sold_date_sk#5]
Arguments: HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint)),false), [id=#14]
Input [3]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4]
Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)),false), [id=#13]

(19) Scan parquet default.date_dim
Output [3]: [d_date_sk#15, d_year#16, d_dom#17]
(19) Scan parquet default.household_demographics
Output [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, hd_vehicle_count#17]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_dom), GreaterThanOrEqual(d_dom,1), LessThanOrEqual(d_dom,2), In(d_year, [1998,1999,2000]), In(d_date_sk, [2451790,2451119,2451180,2451454,2450874,2450906,2450967,2451485,2451850,2451514,2451270,2451758,2451028,2451546,2450997,2450996,2451393,2451667,2451453,2451819,2450905,2451331,2451577,2451089,2451301,2451545,2451605,2451851,2451181,2451149,2451820,2451362,2451392,2451240,2450935,2451637,2451484,2451058,2451300,2451727,2451759,2450815,2451698,2451150,2451332,2451606,2451666,2451211,2450846,2450875,2450966,2450936,2451361,2451212,2451880,2451059,2451789,2451423,2451576,2450816,2451088,2451728,2451027,2451120,2451881,2451697,2450847,2451271,2451636,2451515,2451424,2451239]), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int,d_dom:int>
Location [not included in comparison]/{warehouse_dir}/household_demographics]
PushedFilters: [IsNotNull(hd_vehicle_count), IsNotNull(hd_dep_count), Or(EqualTo(hd_buy_potential,>10000 ),EqualTo(hd_buy_potential,Unknown )), GreaterThan(hd_vehicle_count,0), IsNotNull(hd_demo_sk)]
ReadSchema: struct<hd_demo_sk:int,hd_buy_potential:string,hd_dep_count:int,hd_vehicle_count:int>

(20) ColumnarToRow
Input [3]: [d_date_sk#15, d_year#16, d_dom#17]
Input [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, hd_vehicle_count#17]

(21) Filter
Input [3]: [d_date_sk#15, d_year#16, d_dom#17]
Condition : (((((isnotnull(d_dom#17) AND (d_dom#17 >= 1)) AND (d_dom#17 <= 2)) AND d_year#16 IN (1998,1999,2000)) AND d_date_sk#15 INSET (2451790,2451119,2451180,2451454,2450874,2450906,2450967,2451485,2451850,2451514,2451270,2451758,2451028,2451546,2450997,2450996,2451393,2451667,2451453,2451819,2450905,2451331,2451577,2451089,2451301,2451545,2451605,2451851,2451181,2451149,2451820,2451362,2451392,2451240,2450935,2451637,2451484,2451058,2451300,2451727,2451759,2450815,2451698,2451150,2451332,2451606,2451666,2451211,2450846,2450875,2450966,2450936,2451361,2451212,2451880,2451059,2451789,2451423,2451576,2450816,2451088,2451728,2451027,2451120,2451881,2451697,2450847,2451271,2451636,2451515,2451424,2451239)) AND isnotnull(d_date_sk#15))
Input [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, hd_vehicle_count#17]
Condition : (((((isnotnull(hd_vehicle_count#17) AND isnotnull(hd_dep_count#16)) AND ((hd_buy_potential#15 = >10000 ) OR (hd_buy_potential#15 = Unknown ))) AND (hd_vehicle_count#17 > 0)) AND ((cast(hd_dep_count#16 as double) / cast(hd_vehicle_count#17 as double)) > 1.0)) AND isnotnull(hd_demo_sk#14))

(22) Project
Output [1]: [d_date_sk#15]
Input [3]: [d_date_sk#15, d_year#16, d_dom#17]
Output [1]: [hd_demo_sk#14]
Input [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, hd_vehicle_count#17]

(23) BroadcastHashJoin [codegen id : 4]
Left keys [1]: [ss_sold_date_sk#5]
Right keys [1]: [d_date_sk#15]
Left keys [1]: [ss_hdemo_sk#2]
Right keys [1]: [hd_demo_sk#14]
Join condition: None

(24) Project [codegen id : 4]
Output [2]: [ss_customer_sk#1, ss_ticket_number#4]
Input [4]: [ss_customer_sk#1, ss_ticket_number#4, ss_sold_date_sk#5, d_date_sk#15]
Input [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4, hd_demo_sk#14]

(25) HashAggregate [codegen id : 4]
Input [2]: [ss_customer_sk#1, ss_ticket_number#4]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,39 @@ WholeStageCodegen (7)
WholeStageCodegen (4)
HashAggregate [ss_ticket_number,ss_customer_sk] [count,count]
Project [ss_customer_sk,ss_ticket_number]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
BroadcastHashJoin [ss_hdemo_sk,hd_demo_sk]
InputAdapter
BroadcastExchange #4
WholeStageCodegen (3)
Project [ss_customer_sk,ss_ticket_number,ss_sold_date_sk]
Project [ss_customer_sk,ss_hdemo_sk,ss_ticket_number]
BroadcastHashJoin [ss_store_sk,s_store_sk]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (2)
Project [ss_customer_sk,ss_store_sk,ss_ticket_number,ss_sold_date_sk]
BroadcastHashJoin [ss_hdemo_sk,hd_demo_sk]
Project [ss_customer_sk,ss_hdemo_sk,ss_store_sk,ss_ticket_number]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (1)
Filter [ss_store_sk,ss_hdemo_sk,ss_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_customer_sk,ss_hdemo_sk,ss_store_sk,ss_ticket_number,ss_sold_date_sk]
Project [hd_demo_sk]
Filter [hd_vehicle_count,hd_dep_count,hd_buy_potential,hd_demo_sk]
Project [d_date_sk]
Filter [d_dom,d_year,d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.household_demographics [hd_demo_sk,hd_buy_potential,hd_dep_count,hd_vehicle_count]
Scan parquet default.date_dim [d_date_sk,d_year,d_dom]
Project [s_store_sk]
Filter [s_county,s_store_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store [s_store_sk,s_county]
Project [d_date_sk]
Filter [d_dom,d_year,d_date_sk]
Project [hd_demo_sk]
Filter [hd_vehicle_count,hd_dep_count,hd_buy_potential,hd_demo_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_year,d_dom]
Scan parquet default.household_demographics [hd_demo_sk,hd_buy_potential,hd_dep_count,hd_vehicle_count]
Filter [c_customer_sk]
ColumnarToRow
InputAdapter
Expand Down
Loading