From 165cd914c0c002f93cd9b1be35da41d02fae17b0 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 19 Dec 2020 07:24:30 +0900 Subject: [PATCH 1/6] Fix an issue that EXPLAIN FORMATTED doesn't show subquery when APE enabled. --- .../scala/org/apache/spark/sql/execution/ExplainUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 20e6fb6f96ea..4bd006eab5e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -218,6 +218,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { plan: => QueryPlan[_], subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { plan.foreach { + case adp: AdaptiveSparkPlanExec => + getSubqueries(adp.executedPlan, subqueries) case p: SparkPlan => p.expressions.foreach (_.collect { case e: PlanExpression[_] => From 35f127aca42490ae9003415d4b6d9927116a765f Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 19 Dec 2020 16:51:22 +0900 Subject: [PATCH 2/6] Add test. --- .../org/apache/spark/sql/ExplainSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 75372c5437f2..c104aeca97ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -277,6 +277,28 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } + test("explain formatted - check presence of subquery in case of AQE") { + withTable("df1", "df2") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withTable("df1") { + spark.range(1) + .write + .format("parquet") + .mode("overwrite") + .saveAsTable("df1") + + val sqlText = "EXPLAIN FORMATTED SELECT (SELECT id FROM df1) as v" + val expected_pattern1 = + "Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x" + + withNormalizedExplain(sqlText) { normalizedOutput => + assert(expected_pattern1.r.findAllMatchIn(normalizedOutput).length == 1) + } + } + } + } + } + test("Support ExplainMode in Dataset.explain") { val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1") val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2") From 2ee8d195d162bcada5f72ae40cbfe1b0ef79ec17 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 02:27:48 +0900 Subject: [PATCH 3/6] Modify test. --- .../src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index c104aeca97ef..b6f5d9948e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -278,16 +278,16 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } test("explain formatted - check presence of subquery in case of AQE") { - withTable("df1", "df2") { + withTable("df1") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTable("df1") { - spark.range(1) + spark.range(1, 100) .write .format("parquet") .mode("overwrite") .saveAsTable("df1") - val sqlText = "EXPLAIN FORMATTED SELECT (SELECT id FROM df1) as v" + val sqlText = "EXPLAIN FORMATTED SELECT (SELECT min(id) FROM df1) as v" val expected_pattern1 = "Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x" From 6cbe6a6c5c1a384ad39be6a0d43484f1b831be1d Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 02:56:20 +0900 Subject: [PATCH 4/6] Add JIRA ID to the test. --- sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index b6f5d9948e27..0ec57c2fcb5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -277,7 +277,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } - test("explain formatted - check presence of subquery in case of AQE") { + test("SPARK-33850: explain formatted - check presence of subquery in case of AQE") { withTable("df1") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTable("df1") { From 577dd8fe4cac4118a4e1a4a8c3f4ab1cf4a69102 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 03:09:41 +0900 Subject: [PATCH 5/6] Change the variable name "adp" to "a" --- .../scala/org/apache/spark/sql/execution/ExplainUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 4bd006eab5e4..f47542ca59bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -218,8 +218,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { plan: => QueryPlan[_], subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = { plan.foreach { - case adp: AdaptiveSparkPlanExec => - getSubqueries(adp.executedPlan, subqueries) + case a: AdaptiveSparkPlanExec => + getSubqueries(a.executedPlan, subqueries) case p: SparkPlan => p.expressions.foreach (_.collect { case e: PlanExpression[_] => From 13fb225b3742566676009e4fc0f8d5a9d19820c0 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 05:07:11 +0900 Subject: [PATCH 6/6] Modify explain-aqe.sql.out to comply with the change. --- .../sql-tests/results/explain-aqe.sql.out | 263 ++++++++++++++++++ 1 file changed, 263 insertions(+) diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 578b0a807fc5..d68989524d48 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -407,6 +407,101 @@ Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subq Output [2]: [key#x, val#x] Arguments: isFinalPlan=false +===== Subqueries ===== + +Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (10) ++- HashAggregate (9) + +- Exchange (8) + +- HashAggregate (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp2 (4) + + +(4) Scan parquet default.explain_temp2 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp2] +PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x = 2)) + +(6) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(7) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] + +(8) Exchange +Input [1]: [max#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(9) HashAggregate +Input [1]: [max#x] +Keys: [] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] + +(10) AdaptiveSparkPlan +Output [1]: [max(key)#x] +Arguments: isFinalPlan=false + +Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet default.explain_temp3 (11) + + +(11) Scan parquet default.explain_temp3 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp3] +PushedFilters: [IsNotNull(val), GreaterThan(val,0)] +ReadSchema: struct + +(12) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(13) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(14) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] + +(15) Exchange +Input [1]: [max#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(16) HashAggregate +Input [1]: [max#x] +Keys: [] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] + +(17) AdaptiveSparkPlan +Output [1]: [max(key)#x] +Arguments: isFinalPlan=false -- !query EXPLAIN FORMATTED @@ -442,6 +537,101 @@ Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) = Output [2]: [key#x, val#x] Arguments: isFinalPlan=false +===== Subqueries ===== + +Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (10) ++- HashAggregate (9) + +- Exchange (8) + +- HashAggregate (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp2 (4) + + +(4) Scan parquet default.explain_temp2 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp2] +PushedFilters: [IsNotNull(val), GreaterThan(val,0)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(6) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(7) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] + +(8) Exchange +Input [1]: [max#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(9) HashAggregate +Input [1]: [max#x] +Keys: [] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] + +(10) AdaptiveSparkPlan +Output [1]: [max(key)#x] +Arguments: isFinalPlan=false + +Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet default.explain_temp3 (11) + + +(11) Scan parquet default.explain_temp3 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp3] +PushedFilters: [IsNotNull(val), GreaterThan(val,0)] +ReadSchema: struct + +(12) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(val#x) AND (val#x > 0)) + +(13) Project +Output [1]: [key#x] +Input [2]: [key#x, val#x] + +(14) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] + +(15) Exchange +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(16) HashAggregate +Input [2]: [sum#x, count#xL] +Keys: [] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] + +(17) AdaptiveSparkPlan +Output [1]: [avg(key)#x] +Arguments: isFinalPlan=false -- !query EXPLAIN FORMATTED @@ -470,6 +660,79 @@ Input: [] Output [1]: [(scalarsubquery() + scalarsubquery())#x] Arguments: isFinalPlan=false +===== Subqueries ===== + +Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (8) ++- HashAggregate (7) + +- Exchange (6) + +- HashAggregate (5) + +- Scan parquet default.explain_temp1 (4) + + +(4) Scan parquet default.explain_temp1 +Output [1]: [key#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(5) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] + +(6) Exchange +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(7) HashAggregate +Input [2]: [sum#x, count#xL] +Keys: [] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] + +(8) AdaptiveSparkPlan +Output [1]: [avg(key)#x] +Arguments: isFinalPlan=false + +Subquery:2 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (13) ++- HashAggregate (12) + +- Exchange (11) + +- HashAggregate (10) + +- Scan parquet default.explain_temp1 (9) + + +(9) Scan parquet default.explain_temp1 +Output [1]: [key#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(10) HashAggregate +Input [1]: [key#x] +Keys: [] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] + +(11) Exchange +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] + +(12) HashAggregate +Input [2]: [sum#x, count#xL] +Keys: [] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] + +(13) AdaptiveSparkPlan +Output [1]: [avg(key)#x] +Arguments: isFinalPlan=false -- !query EXPLAIN FORMATTED