Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,16 @@ object JoinReorderDP extends PredicateHelper with Logging {
// Level i maintains all found plans for i + 1 items.
// Create the initial plans: each plan is a single item with zero cost.
val itemIndex = items.zipWithIndex
val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map {
case (item, id) => Set(id) -> JoinPlan(Set(id), item, ExpressionSet(), Cost(0, 0))
}.toMap)
val foundPlans = mutable.Buffer[JoinPlanMap]({
// SPARK-32687: Change to use `LinkedHashMap` to make sure that items are
// inserted and iterated in the same order.
val joinPlanMap = new JoinPlanMap
itemIndex.foreach {
case (item, id) =>
joinPlanMap.put(Set(id), JoinPlan(Set(id), item, ExpressionSet(), Cost(0, 0)))
}
joinPlanMap
})

// Build filters from the join graph to be used by the search algorithm.
val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)
Expand Down Expand Up @@ -198,7 +205,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
topOutput: AttributeSet,
filters: Option[JoinGraphInfo]): JoinPlanMap = {

val nextLevel = mutable.Map.empty[Set[Int], JoinPlan]
val nextLevel = new JoinPlanMap
var k = 0
val lev = existingLevels.length - 1
// Build plans for the next level from plans at level k (one side of the join) and level
Expand Down Expand Up @@ -231,7 +238,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
}
k += 1
}
nextLevel.toMap
nextLevel
}

/**
Expand Down Expand Up @@ -316,7 +323,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

/** Map[set of item ids, join plan for these items] */
type JoinPlanMap = Map[Set[Int], JoinPlan]
type JoinPlanMap = mutable.LinkedHashMap[Set[Int], JoinPlan]

/**
* Partial join order in a specific level.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ class StarJoinCostBasedReorderSuite extends JoinReorderPlanTestBase with StatsEs

val expected =
f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t4.join(t3, Inner, Some(nameToAttr("t3_c2") === nameToAttr("t4_c2"))), Inner,
Some(nameToAttr("d1_c2") === nameToAttr("t3_c1")))
.join(t2.join(t1, Inner, Some(nameToAttr("t1_c2") === nameToAttr("t2_c2"))), Inner,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ TakeOrderedAndProject [i_item_id,s_state,g_state,agg1,agg2,agg3,agg4]
Project [ss_item_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
BroadcastHashJoin [ss_cdemo_sk,cd_demo_sk]
Project [ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Filter [ss_sold_date_sk,ss_cdemo_sk,ss_store_sk,ss_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_sold_date_sk,ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
InputAdapter
BroadcastExchange #2
WholeStageCodegen (1)
Expand All @@ -22,10 +26,6 @@ TakeOrderedAndProject [i_item_id,s_state,g_state,agg1,agg2,agg3,agg4]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_year]
Filter [ss_sold_date_sk,ss_cdemo_sk,ss_store_sk,ss_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_sold_date_sk,ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (2)
Expand Down Expand Up @@ -61,13 +61,13 @@ TakeOrderedAndProject [i_item_id,s_state,g_state,agg1,agg2,agg3,agg4]
Project [ss_item_sk,ss_cdemo_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
InputAdapter
ReusedExchange [d_date_sk] #2
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Filter [ss_sold_date_sk,ss_cdemo_sk,ss_store_sk,ss_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_sold_date_sk,ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
BroadcastExchange #7
WholeStageCodegen (8)
Expand All @@ -93,13 +93,13 @@ TakeOrderedAndProject [i_item_id,s_state,g_state,agg1,agg2,agg3,agg4]
Project [ss_item_sk,ss_cdemo_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
InputAdapter
ReusedExchange [d_date_sk] #2
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Filter [ss_sold_date_sk,ss_cdemo_sk,ss_store_sk,ss_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_sold_date_sk,ss_item_sk,ss_cdemo_sk,ss_store_sk,ss_quantity,ss_list_price,ss_sales_price,ss_coupon_amt]
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
ReusedExchange [s_store_sk] #7
InputAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ TakeOrderedAndProject (34)
: :- * Project (17)
: : +- * BroadcastHashJoin Inner BuildRight (16)
: : :- * Project (10)
: : : +- * BroadcastHashJoin Inner BuildLeft (9)
: : : :- BroadcastExchange (5)
: : : : +- * Project (4)
: : : : +- * Filter (3)
: : : : +- * ColumnarToRow (2)
: : : : +- Scan parquet default.date_dim (1)
: : : +- * Filter (8)
: : : +- * ColumnarToRow (7)
: : : +- Scan parquet default.store_sales (6)
: : : +- * BroadcastHashJoin Inner BuildRight (9)
: : : :- * Filter (3)
: : : : +- * ColumnarToRow (2)
: : : : +- Scan parquet default.store_sales (1)
: : : +- BroadcastExchange (8)
: : : +- * Project (7)
: : : +- * Filter (6)
: : : +- * ColumnarToRow (5)
: : : +- Scan parquet default.date_dim (4)
: : +- BroadcastExchange (15)
: : +- * Project (14)
: : +- * Filter (13)
Expand All @@ -35,50 +35,50 @@ TakeOrderedAndProject (34)
+- Scan parquet default.item (25)


(1) Scan parquet default.date_dim
Output [2]: [d_date_sk#1, d_year#2]
(1) Scan parquet default.store_sales
Output [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,1998), GreaterThanOrEqual(d_date_sk,2450815), LessThanOrEqual(d_date_sk,2451179), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_sold_date_sk), GreaterThanOrEqual(ss_sold_date_sk,2450815), LessThanOrEqual(ss_sold_date_sk,2451179), IsNotNull(ss_cdemo_sk), IsNotNull(ss_item_sk), IsNotNull(ss_promo_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_item_sk:int,ss_cdemo_sk:int,ss_promo_sk:int,ss_quantity:int,ss_list_price:decimal(7,2),ss_sales_price:decimal(7,2),ss_coupon_amt:decimal(7,2)>

(2) ColumnarToRow [codegen id : 1]
Input [2]: [d_date_sk#1, d_year#2]
(2) ColumnarToRow [codegen id : 5]
Input [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]

(3) Filter [codegen id : 1]
Input [2]: [d_date_sk#1, d_year#2]
Condition : ((((isnotnull(d_year#2) AND (d_year#2 = 1998)) AND (d_date_sk#1 >= 2450815)) AND (d_date_sk#1 <= 2451179)) AND isnotnull(d_date_sk#1))
(3) Filter [codegen id : 5]
Input [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
Condition : (((((isnotnull(ss_sold_date_sk#1) AND (ss_sold_date_sk#1 >= 2450815)) AND (ss_sold_date_sk#1 <= 2451179)) AND isnotnull(ss_cdemo_sk#3)) AND isnotnull(ss_item_sk#2)) AND isnotnull(ss_promo_sk#4))

(4) Project [codegen id : 1]
Output [1]: [d_date_sk#1]
Input [2]: [d_date_sk#1, d_year#2]
(4) Scan parquet default.date_dim
Output [2]: [d_date_sk#9, d_year#10]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,1998), GreaterThanOrEqual(d_date_sk,2450815), LessThanOrEqual(d_date_sk,2451179), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_year:int>

(5) BroadcastExchange
Input [1]: [d_date_sk#1]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#3]
(5) ColumnarToRow [codegen id : 1]
Input [2]: [d_date_sk#9, d_year#10]

(6) Scan parquet default.store_sales
Output [8]: [ss_sold_date_sk#4, ss_item_sk#5, ss_cdemo_sk#6, ss_promo_sk#7, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store_sales]
PushedFilters: [IsNotNull(ss_sold_date_sk), GreaterThanOrEqual(ss_sold_date_sk,2450815), LessThanOrEqual(ss_sold_date_sk,2451179), IsNotNull(ss_cdemo_sk), IsNotNull(ss_item_sk), IsNotNull(ss_promo_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_item_sk:int,ss_cdemo_sk:int,ss_promo_sk:int,ss_quantity:int,ss_list_price:decimal(7,2),ss_sales_price:decimal(7,2),ss_coupon_amt:decimal(7,2)>
(6) Filter [codegen id : 1]
Input [2]: [d_date_sk#9, d_year#10]
Condition : ((((isnotnull(d_year#10) AND (d_year#10 = 1998)) AND (d_date_sk#9 >= 2450815)) AND (d_date_sk#9 <= 2451179)) AND isnotnull(d_date_sk#9))

(7) ColumnarToRow
Input [8]: [ss_sold_date_sk#4, ss_item_sk#5, ss_cdemo_sk#6, ss_promo_sk#7, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
(7) Project [codegen id : 1]
Output [1]: [d_date_sk#9]
Input [2]: [d_date_sk#9, d_year#10]

(8) Filter
Input [8]: [ss_sold_date_sk#4, ss_item_sk#5, ss_cdemo_sk#6, ss_promo_sk#7, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
Condition : (((((isnotnull(ss_sold_date_sk#4) AND (ss_sold_date_sk#4 >= 2450815)) AND (ss_sold_date_sk#4 <= 2451179)) AND isnotnull(ss_cdemo_sk#6)) AND isnotnull(ss_item_sk#5)) AND isnotnull(ss_promo_sk#7))
(8) BroadcastExchange
Input [1]: [d_date_sk#9]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#11]

(9) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [d_date_sk#1]
Right keys [1]: [ss_sold_date_sk#4]
Left keys [1]: [ss_sold_date_sk#1]
Right keys [1]: [d_date_sk#9]
Join condition: None

(10) Project [codegen id : 5]
Output [7]: [ss_item_sk#5, ss_cdemo_sk#6, ss_promo_sk#7, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
Input [9]: [d_date_sk#1, ss_sold_date_sk#4, ss_item_sk#5, ss_cdemo_sk#6, ss_promo_sk#7, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
Output [7]: [ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
Input [9]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8, d_date_sk#9]

(11) Scan parquet default.promotion
Output [3]: [p_promo_sk#12, p_channel_email#13, p_channel_event#14]
Expand All @@ -103,13 +103,13 @@ Input [1]: [p_promo_sk#12]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#15]

(16) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_promo_sk#7]
Left keys [1]: [ss_promo_sk#4]
Right keys [1]: [p_promo_sk#12]
Join condition: None

(17) Project [codegen id : 5]
Output [6]: [ss_item_sk#5, ss_cdemo_sk#6, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
Input [8]: [ss_item_sk#5, ss_cdemo_sk#6, ss_promo_sk#7, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11, p_promo_sk#12]
Output [6]: [ss_item_sk#2, ss_cdemo_sk#3, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
Input [8]: [ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8, p_promo_sk#12]

(18) Scan parquet default.customer_demographics
Output [4]: [cd_demo_sk#16, cd_gender#17, cd_marital_status#18, cd_education_status#19]
Expand All @@ -134,13 +134,13 @@ Input [1]: [cd_demo_sk#16]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#20]

(23) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_cdemo_sk#6]
Left keys [1]: [ss_cdemo_sk#3]
Right keys [1]: [cd_demo_sk#16]
Join condition: None

(24) Project [codegen id : 5]
Output [5]: [ss_item_sk#5, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11]
Input [7]: [ss_item_sk#5, ss_cdemo_sk#6, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11, cd_demo_sk#16]
Output [5]: [ss_item_sk#2, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
Input [7]: [ss_item_sk#2, ss_cdemo_sk#3, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8, cd_demo_sk#16]

(25) Scan parquet default.item
Output [2]: [i_item_sk#21, i_item_id#22]
Expand All @@ -161,18 +161,18 @@ Input [2]: [i_item_sk#21, i_item_id#22]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#23]

(29) BroadcastHashJoin [codegen id : 5]
Left keys [1]: [ss_item_sk#5]
Left keys [1]: [ss_item_sk#2]
Right keys [1]: [i_item_sk#21]
Join condition: None

(30) Project [codegen id : 5]
Output [5]: [ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11, i_item_id#22]
Input [7]: [ss_item_sk#5, ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11, i_item_sk#21, i_item_id#22]
Output [5]: [ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8, i_item_id#22]
Input [7]: [ss_item_sk#2, ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8, i_item_sk#21, i_item_id#22]

(31) HashAggregate [codegen id : 5]
Input [5]: [ss_quantity#8, ss_list_price#9, ss_sales_price#10, ss_coupon_amt#11, i_item_id#22]
Input [5]: [ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8, i_item_id#22]
Keys [1]: [i_item_id#22]
Functions [4]: [partial_avg(cast(ss_quantity#8 as bigint)), partial_avg(UnscaledValue(ss_list_price#9)), partial_avg(UnscaledValue(ss_coupon_amt#11)), partial_avg(UnscaledValue(ss_sales_price#10))]
Functions [4]: [partial_avg(cast(ss_quantity#5 as bigint)), partial_avg(UnscaledValue(ss_list_price#6)), partial_avg(UnscaledValue(ss_coupon_amt#8)), partial_avg(UnscaledValue(ss_sales_price#7))]
Aggregate Attributes [8]: [sum#24, count#25, sum#26, count#27, sum#28, count#29, sum#30, count#31]
Results [9]: [i_item_id#22, sum#32, count#33, sum#34, count#35, sum#36, count#37, sum#38, count#39]

Expand All @@ -183,9 +183,9 @@ Arguments: hashpartitioning(i_item_id#22, 5), true, [id=#40]
(33) HashAggregate [codegen id : 6]
Input [9]: [i_item_id#22, sum#32, count#33, sum#34, count#35, sum#36, count#37, sum#38, count#39]
Keys [1]: [i_item_id#22]
Functions [4]: [avg(cast(ss_quantity#8 as bigint)), avg(UnscaledValue(ss_list_price#9)), avg(UnscaledValue(ss_coupon_amt#11)), avg(UnscaledValue(ss_sales_price#10))]
Aggregate Attributes [4]: [avg(cast(ss_quantity#8 as bigint))#41, avg(UnscaledValue(ss_list_price#9))#42, avg(UnscaledValue(ss_coupon_amt#11))#43, avg(UnscaledValue(ss_sales_price#10))#44]
Results [5]: [i_item_id#22, avg(cast(ss_quantity#8 as bigint))#41 AS agg1#45, cast((avg(UnscaledValue(ss_list_price#9))#42 / 100.0) as decimal(11,6)) AS agg2#46, cast((avg(UnscaledValue(ss_coupon_amt#11))#43 / 100.0) as decimal(11,6)) AS agg3#47, cast((avg(UnscaledValue(ss_sales_price#10))#44 / 100.0) as decimal(11,6)) AS agg4#48]
Functions [4]: [avg(cast(ss_quantity#5 as bigint)), avg(UnscaledValue(ss_list_price#6)), avg(UnscaledValue(ss_coupon_amt#8)), avg(UnscaledValue(ss_sales_price#7))]
Aggregate Attributes [4]: [avg(cast(ss_quantity#5 as bigint))#41, avg(UnscaledValue(ss_list_price#6))#42, avg(UnscaledValue(ss_coupon_amt#8))#43, avg(UnscaledValue(ss_sales_price#7))#44]
Results [5]: [i_item_id#22, avg(cast(ss_quantity#5 as bigint))#41 AS agg1#45, cast((avg(UnscaledValue(ss_list_price#6))#42 / 100.0) as decimal(11,6)) AS agg2#46, cast((avg(UnscaledValue(ss_coupon_amt#8))#43 / 100.0) as decimal(11,6)) AS agg3#47, cast((avg(UnscaledValue(ss_sales_price#7))#44 / 100.0) as decimal(11,6)) AS agg4#48]

(34) TakeOrderedAndProject
Input [5]: [i_item_id#22, agg1#45, agg2#46, agg3#47, agg4#48]
Expand Down
Loading