Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ object Union {
}

/**
* Logical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
* Logical plan for unioning multiple plans, without a distinct. This is UNION ALL in SQL.
*
* @param byName Whether resolves columns in the children by column names.
* @param allowMissingCol Allows missing columns in children query plans. If it is true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitScriptTransform(p: ScriptTransformation): Statistics = default(p)

override def visitUnion(p: Union): Statistics = fallback(p)
override def visitUnion(p: Union): Statistics = {
val stats = p.children.map(_.stats)
val rowCount = if (stats.exists(_.rowCount.isEmpty)) {
None
} else {
Some(stats.map(_.rowCount.get).sum)
}
Statistics(sizeInBytes = stats.map(_.sizeInBytes).sum, rowCount = rowCount)
}
Comment on lines +82 to +90
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same logic, just add row count:

override def visitUnion(p: Union): Statistics = {
Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).sum)
}


override def visitWindow(p: Window): Statistics = fallback(p)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = Statistics(sizeInBytes = 120))
}

test("SPARK-34031: Union operator missing rowCount when enable CBO") {
val union = Union(plan :: plan :: plan :: Nil)
val childrenSize = union.children.size
val sizeInBytes = plan.size.get * childrenSize
val rowCount = Some(plan.rowCount * childrenSize)
checkStats(
union,
expectedStatsCboOn = Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount),
expectedStatsCboOff = Statistics(sizeInBytes = sizeInBytes))
}

/** Check estimated stats when cbo is turned on/off. */
private def checkStats(
plan: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
== Physical Plan ==
* Sort (41)
+- Exchange (40)
+- * Project (39)
+- * SortMergeJoin Inner (38)
:- * Sort (26)
: +- * Project (25)
: +- * BroadcastHashJoin Inner BuildRight (24)
: :- * HashAggregate (18)
: : +- Exchange (17)
: : +- * HashAggregate (16)
: : +- * Project (15)
: : +- * BroadcastHashJoin Inner BuildRight (14)
: : :- Union (9)
: : : :- * Project (4)
: : : : +- * Filter (3)
: : : : +- * ColumnarToRow (2)
: : : : +- Scan parquet default.web_sales (1)
: : : +- * Project (8)
: : : +- * Filter (7)
: : : +- * ColumnarToRow (6)
: : : +- Scan parquet default.catalog_sales (5)
: : +- BroadcastExchange (13)
: : +- * Filter (12)
: : +- * ColumnarToRow (11)
: : +- Scan parquet default.date_dim (10)
: +- BroadcastExchange (23)
: +- * Project (22)
: +- * Filter (21)
: +- * ColumnarToRow (20)
: +- Scan parquet default.date_dim (19)
+- * Sort (37)
+- Exchange (36)
+- * Project (35)
+- * BroadcastHashJoin Inner BuildRight (34)
:- * HashAggregate (28)
: +- ReusedExchange (27)
+- BroadcastExchange (33)
+- * Project (32)
+- * Filter (31)
+- * ColumnarToRow (30)
+- Scan parquet default.date_dim (29)
* Sort (39)
+- Exchange (38)
+- * Project (37)
+- * BroadcastHashJoin Inner BuildRight (36)
:- * Project (25)
: +- * BroadcastHashJoin Inner BuildRight (24)
: :- * HashAggregate (18)
: : +- Exchange (17)
: : +- * HashAggregate (16)
: : +- * Project (15)
: : +- * BroadcastHashJoin Inner BuildRight (14)
: : :- Union (9)
: : : :- * Project (4)
: : : : +- * Filter (3)
: : : : +- * ColumnarToRow (2)
: : : : +- Scan parquet default.web_sales (1)
: : : +- * Project (8)
: : : +- * Filter (7)
: : : +- * ColumnarToRow (6)
: : : +- Scan parquet default.catalog_sales (5)
: : +- BroadcastExchange (13)
: : +- * Filter (12)
: : +- * ColumnarToRow (11)
: : +- Scan parquet default.date_dim (10)
: +- BroadcastExchange (23)
: +- * Project (22)
: +- * Filter (21)
: +- * ColumnarToRow (20)
: +- Scan parquet default.date_dim (19)
+- BroadcastExchange (35)
+- * Project (34)
+- * BroadcastHashJoin Inner BuildRight (33)
:- * HashAggregate (27)
: +- ReusedExchange (26)
+- BroadcastExchange (32)
+- * Project (31)
+- * Filter (30)
+- * ColumnarToRow (29)
+- Scan parquet default.date_dim (28)


(1) Scan parquet default.web_sales
Expand Down Expand Up @@ -116,9 +114,9 @@ Results [8]: [d_week_seq#10, sum#20, sum#21, sum#22, sum#23, sum#24, sum#25, sum

(17) Exchange
Input [8]: [d_week_seq#10, sum#20, sum#21, sum#22, sum#23, sum#24, sum#25, sum#26]
Arguments: hashpartitioning(d_week_seq#10, 5), true, [id=#27]
Arguments: hashpartitioning(d_week_seq#10, 5), ENSURE_REQUIREMENTS, [id=#27]

(18) HashAggregate [codegen id : 6]
(18) HashAggregate [codegen id : 12]
Input [8]: [d_week_seq#10, sum#20, sum#21, sum#22, sum#23, sum#24, sum#25, sum#26]
Keys [1]: [d_week_seq#10]
Functions [7]: [sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))]
Expand Down Expand Up @@ -147,82 +145,74 @@ Input [2]: [d_week_seq#42, d_year#43]
Input [1]: [d_week_seq#42]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#44]

(24) BroadcastHashJoin [codegen id : 6]
(24) BroadcastHashJoin [codegen id : 12]
Left keys [1]: [d_week_seq#10]
Right keys [1]: [d_week_seq#42]
Join condition: None

(25) Project [codegen id : 6]
(25) Project [codegen id : 12]
Output [8]: [d_week_seq#10 AS d_week_seq1#45, sun_sales#35 AS sun_sales1#46, mon_sales#36 AS mon_sales1#47, tue_sales#37 AS tue_sales1#48, wed_sales#38 AS wed_sales1#49, thu_sales#39 AS thu_sales1#50, fri_sales#40 AS fri_sales1#51, sat_sales#41 AS sat_sales1#52]
Input [9]: [d_week_seq#10, sun_sales#35, mon_sales#36, tue_sales#37, wed_sales#38, thu_sales#39, fri_sales#40, sat_sales#41, d_week_seq#42]

(26) Sort [codegen id : 6]
Input [8]: [d_week_seq1#45, sun_sales1#46, mon_sales1#47, tue_sales1#48, wed_sales1#49, thu_sales1#50, fri_sales1#51, sat_sales1#52]
Arguments: [d_week_seq1#45 ASC NULLS FIRST], false, 0

(27) ReusedExchange [Reuses operator id: 17]
(26) ReusedExchange [Reuses operator id: 17]
Output [8]: [d_week_seq#10, sum#53, sum#54, sum#55, sum#56, sum#57, sum#58, sum#59]

(28) HashAggregate [codegen id : 12]
(27) HashAggregate [codegen id : 11]
Input [8]: [d_week_seq#10, sum#53, sum#54, sum#55, sum#56, sum#57, sum#58, sum#59]
Keys [1]: [d_week_seq#10]
Functions [7]: [sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))]
Aggregate Attributes [7]: [sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END))#60, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END))#61, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END))#62, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END))#63, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END))#64, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END))#65, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))#66]
Results [8]: [d_week_seq#10, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END))#60,17,2) AS sun_sales#35, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END))#61,17,2) AS mon_sales#36, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END))#62,17,2) AS tue_sales#37, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END))#63,17,2) AS wed_sales#38, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END))#64,17,2) AS thu_sales#39, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END))#65,17,2) AS fri_sales#40, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))#66,17,2) AS sat_sales#41]

(29) Scan parquet default.date_dim
(28) Scan parquet default.date_dim
Output [2]: [d_week_seq#67, d_year#68]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_week_seq)]
ReadSchema: struct<d_week_seq:int,d_year:int>

(30) ColumnarToRow [codegen id : 11]
(29) ColumnarToRow [codegen id : 10]
Input [2]: [d_week_seq#67, d_year#68]

(31) Filter [codegen id : 11]
(30) Filter [codegen id : 10]
Input [2]: [d_week_seq#67, d_year#68]
Condition : ((isnotnull(d_year#68) AND (d_year#68 = 2002)) AND isnotnull(d_week_seq#67))

(32) Project [codegen id : 11]
(31) Project [codegen id : 10]
Output [1]: [d_week_seq#67]
Input [2]: [d_week_seq#67, d_year#68]

(33) BroadcastExchange
(32) BroadcastExchange
Input [1]: [d_week_seq#67]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#69]

(34) BroadcastHashJoin [codegen id : 12]
(33) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [d_week_seq#10]
Right keys [1]: [d_week_seq#67]
Join condition: None

(35) Project [codegen id : 12]
(34) Project [codegen id : 11]
Output [8]: [d_week_seq#10 AS d_week_seq2#70, sun_sales#35 AS sun_sales2#71, mon_sales#36 AS mon_sales2#72, tue_sales#37 AS tue_sales2#73, wed_sales#38 AS wed_sales2#74, thu_sales#39 AS thu_sales2#75, fri_sales#40 AS fri_sales2#76, sat_sales#41 AS sat_sales2#77]
Input [9]: [d_week_seq#10, sun_sales#35, mon_sales#36, tue_sales#37, wed_sales#38, thu_sales#39, fri_sales#40, sat_sales#41, d_week_seq#67]

(36) Exchange
Input [8]: [d_week_seq2#70, sun_sales2#71, mon_sales2#72, tue_sales2#73, wed_sales2#74, thu_sales2#75, fri_sales2#76, sat_sales2#77]
Arguments: hashpartitioning((d_week_seq2#70 - 53), 5), true, [id=#78]

(37) Sort [codegen id : 13]
(35) BroadcastExchange
Input [8]: [d_week_seq2#70, sun_sales2#71, mon_sales2#72, tue_sales2#73, wed_sales2#74, thu_sales2#75, fri_sales2#76, sat_sales2#77]
Arguments: [(d_week_seq2#70 - 53) ASC NULLS FIRST], false, 0
Arguments: HashedRelationBroadcastMode(List(cast((input[0, int, true] - 53) as bigint)),false), [id=#78]

(38) SortMergeJoin [codegen id : 14]
(36) BroadcastHashJoin [codegen id : 12]
Left keys [1]: [d_week_seq1#45]
Right keys [1]: [(d_week_seq2#70 - 53)]
Join condition: None

(39) Project [codegen id : 14]
(37) Project [codegen id : 12]
Output [8]: [d_week_seq1#45, round(CheckOverflow((promote_precision(sun_sales1#46) / promote_precision(sun_sales2#71)), DecimalType(37,20), true), 2) AS round((sun_sales1 / sun_sales2), 2)#79, round(CheckOverflow((promote_precision(mon_sales1#47) / promote_precision(mon_sales2#72)), DecimalType(37,20), true), 2) AS round((mon_sales1 / mon_sales2), 2)#80, round(CheckOverflow((promote_precision(tue_sales1#48) / promote_precision(tue_sales2#73)), DecimalType(37,20), true), 2) AS round((tue_sales1 / tue_sales2), 2)#81, round(CheckOverflow((promote_precision(wed_sales1#49) / promote_precision(wed_sales2#74)), DecimalType(37,20), true), 2) AS round((wed_sales1 / wed_sales2), 2)#82, round(CheckOverflow((promote_precision(thu_sales1#50) / promote_precision(thu_sales2#75)), DecimalType(37,20), true), 2) AS round((thu_sales1 / thu_sales2), 2)#83, round(CheckOverflow((promote_precision(fri_sales1#51) / promote_precision(fri_sales2#76)), DecimalType(37,20), true), 2) AS round((fri_sales1 / fri_sales2), 2)#84, round(CheckOverflow((promote_precision(sat_sales1#52) / promote_precision(sat_sales2#77)), DecimalType(37,20), true), 2) AS round((sat_sales1 / sat_sales2), 2)#85]
Input [16]: [d_week_seq1#45, sun_sales1#46, mon_sales1#47, tue_sales1#48, wed_sales1#49, thu_sales1#50, fri_sales1#51, sat_sales1#52, d_week_seq2#70, sun_sales2#71, mon_sales2#72, tue_sales2#73, wed_sales2#74, thu_sales2#75, fri_sales2#76, sat_sales2#77]

(40) Exchange
(38) Exchange
Input [8]: [d_week_seq1#45, round((sun_sales1 / sun_sales2), 2)#79, round((mon_sales1 / mon_sales2), 2)#80, round((tue_sales1 / tue_sales2), 2)#81, round((wed_sales1 / wed_sales2), 2)#82, round((thu_sales1 / thu_sales2), 2)#83, round((fri_sales1 / fri_sales2), 2)#84, round((sat_sales1 / sat_sales2), 2)#85]
Arguments: rangepartitioning(d_week_seq1#45 ASC NULLS FIRST, 5), true, [id=#86]
Arguments: rangepartitioning(d_week_seq1#45 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#86]

(41) Sort [codegen id : 15]
(39) Sort [codegen id : 13]
Input [8]: [d_week_seq1#45, round((sun_sales1 / sun_sales2), 2)#79, round((mon_sales1 / mon_sales2), 2)#80, round((tue_sales1 / tue_sales2), 2)#81, round((wed_sales1 / wed_sales2), 2)#82, round((thu_sales1 / thu_sales2), 2)#83, round((fri_sales1 / fri_sales2), 2)#84, round((sat_sales1 / sat_sales2), 2)#85]
Arguments: [d_week_seq1#45 ASC NULLS FIRST], true, 0

Loading