Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -991,13 +991,23 @@ object TransposeWindow extends Rule[LogicalPlan] {
})
}

private def windowsCompatible(w1: Window, w2: Window): Boolean = {
w1.references.intersect(w2.windowOutputSet).isEmpty &&
w1.expressions.forall(_.deterministic) &&
w2.expressions.forall(_.deterministic) &&
compatiblePartitions(w1.partitionSpec, w2.partitionSpec)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
if w1.references.intersect(w2.windowOutputSet).isEmpty &&
w1.expressions.forall(_.deterministic) &&
w2.expressions.forall(_.deterministic) &&
compatiblePartitions(ps1, ps2) =>
Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild)))
case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild))
if windowsCompatible(w1, w2) =>
Project(w1.output, w2.copy(child = w1.copy(child = grandChild)))

case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild)))
if windowsCompatible(w1, w2) && w1.references.subsetOf(grandChild.outputSet) =>
Project(
pl ++ w1.windowOutputSet,
w2.copy(child = w1.copy(child = grandChild)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,35 @@ class TransposeWindowSuite extends PlanTest {
comparePlans(optimized, analyzed)
}

test("SPARK-34807: transpose two windows with compatible partitions " +
"and a Project between them") {
val query = testRelation
.window(Seq(sum(c).as("_we0")), partitionSpec2, orderSpec2)
.select(a, b, c, d, $"_we0" as "sum_a_2")
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1)

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

val correctAnswer = testRelation
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1)
.window(Seq(sum(c).as('_we0)), partitionSpec2, orderSpec2)
.select('a, 'b, 'c, 'd, $"_we0" as "sum_a_2", 'sum_a_1)

comparePlans(optimized, correctAnswer.analyze)
}

test("SPARK-34807: don't transpose two windows if project between them " +
"generates an input column") {
val query = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2)
.select(a, b, c, d, $"sum_a_2", c + d as "e")
.window(Seq(sum($"e").as('sum_a_1)), partitionSpec1, orderSpec1)

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

comparePlans(optimized, analyzed)
}

}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,84 +1,81 @@
TakeOrderedAndProject [sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
WholeStageCodegen (23)
WholeStageCodegen (22)
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
BroadcastHashJoin [i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
BroadcastHashJoin [i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Filter [d_year,avg_monthly_sales,sum_sales]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (8)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #1
WholeStageCodegen (7)
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales]
InputAdapter
Window [_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
WholeStageCodegen (6)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name,d_year] #2
WholeStageCodegen (5)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #3
WholeStageCodegen (4)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] [sum,sum]
Project [i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,ss_sold_date_sk]
BroadcastHashJoin [i_item_sk,ss_item_sk]
Filter [i_item_sk,i_category,i_brand]
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
Filter [avg_monthly_sales,sum_sales]
InputAdapter
Window [_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
WholeStageCodegen (7)
Filter [d_year]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (6)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #1
WholeStageCodegen (5)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
WholeStageCodegen (4)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] [sum,sum]
Project [i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,ss_sold_date_sk]
BroadcastHashJoin [i_item_sk,ss_item_sk]
Filter [i_item_sk,i_category,i_brand]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_item_sk,i_brand,i_category]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Filter [ss_item_sk,ss_store_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_item_sk,i_brand,i_category]
Scan parquet default.store_sales [ss_item_sk,ss_store_sk,ss_sales_price,ss_sold_date_sk]
SubqueryBroadcast [d_date_sk] #1
ReusedExchange [d_date_sk,d_year,d_moy] #4
InputAdapter
BroadcastExchange #4
WholeStageCodegen (2)
Filter [d_year,d_moy,d_date_sk]
ColumnarToRow
InputAdapter
BroadcastExchange #4
WholeStageCodegen (1)
Filter [ss_item_sk,ss_store_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_item_sk,ss_store_sk,ss_sales_price,ss_sold_date_sk]
SubqueryBroadcast [d_date_sk] #1
ReusedExchange [d_date_sk,d_year,d_moy] #5
Scan parquet default.date_dim [d_date_sk,d_year,d_moy]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (3)
Filter [s_store_sk,s_store_name,s_company_name]
ColumnarToRow
InputAdapter
BroadcastExchange #5
WholeStageCodegen (2)
Filter [d_year,d_moy,d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_year,d_moy]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (3)
Filter [s_store_sk,s_store_name,s_company_name]
ColumnarToRow
InputAdapter
Scan parquet default.store [s_store_sk,s_store_name,s_company_name]
Scan parquet default.store [s_store_sk,s_store_name,s_company_name]
InputAdapter
BroadcastExchange #7
WholeStageCodegen (15)
BroadcastExchange #6
WholeStageCodegen (14)
Project [i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (14)
WholeStageCodegen (13)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #8
WholeStageCodegen (13)
Exchange [i_category,i_brand,s_store_name,s_company_name] #7
WholeStageCodegen (12)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #9
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #8
InputAdapter
BroadcastExchange #10
WholeStageCodegen (22)
BroadcastExchange #9
WholeStageCodegen (21)
Project [i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (21)
WholeStageCodegen (20)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales] #8
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales] #7
Loading