From 64c679704e08a99662bce374f34867ca05ebfda9 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 6 Jan 2023 09:17:22 +0100 Subject: [PATCH 1/5] Return optimized logical write query plan as promised in comment --- .../spark/sql/execution/datasources/V1WriteCommandSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index e9c5c77e6d98..50d31645dab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -58,7 +58,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { protected def executeAndCheckOrdering( hasLogicalSort: Boolean, orderingMatched: Boolean, - hasEmpty2Null: Boolean = false)(query: => Unit): Unit = { + hasEmpty2Null: Boolean = false)(query: => Unit): LogicalPlan = { var optimizedPlan: LogicalPlan = null val listener = new QueryExecutionListener { @@ -97,6 +97,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan") spark.listenerManager.unregister(listener) + + optimizedPlan } } From eb22ff85c7d3c4079236d61bfba10179929c73ab Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 6 Jan 2023 12:18:44 +0100 Subject: [PATCH 2/5] Materialize adaptive plan before checking actual ordering, add tests --- .../adaptive/AdaptiveSparkPlanExec.scala | 2 + .../datasources/FileFormatWriter.scala | 40 +++++++- .../datasources/V1WriteCommandSuite.scala | 98 ++++++++++++++++--- 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 62a75e753455..395e5468b640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -225,6 +225,8 @@ case class AdaptiveSparkPlanExec( .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) } + def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 8ce87d6fbe14..b9837705c8f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -62,6 +63,11 @@ object FileFormatWriter extends Logging { */ private[sql] var outputOrderingMatched: Boolean = false + /** + * A variable used in tests to check the final executed plan. + */ + private[sql] var executedPlan: Option[SparkPlan] = None + // scalastyle:off argcount /** * Basic work flow of this command is: @@ -138,9 +144,21 @@ object FileFormatWriter extends Logging { val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) + + // SPARK-40588: when planned writing is disabled and AQE is enabled, + // plan contains an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + // it is fine to use plan further down as the final plan is cached in that plan + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + // the sort order doesn't matter // Use the output ordering from the original plan before adding the empty2null projection. - val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child) + val actualOrdering = writeFilesOpt.map(_.child) + .getOrElse(materializeAdaptiveSparkPlan(plan)) + .outputOrdering.map(_.child) val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) @@ -189,6 +207,7 @@ object FileFormatWriter extends Logging { partitionColumns: Seq[Attribute], sortColumns: Seq[Attribute], orderingMatched: Boolean): Set[String] = { + Console.println(f"Writing plan $plan") val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions)) val empty2NullPlan = if (hasEmpty2Null) { plan @@ -198,19 +217,25 @@ object FileFormatWriter extends Logging { } writeAndCommit(job, description, committer) { - val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - (empty2NullPlan.execute(), None) + val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { + (empty2NullPlan, None) } else { val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) val concurrentOutputWriterSpec = createConcurrentOutputWriterSpec( sparkSession, sortPlan, sortColumns) if (concurrentOutputWriterSpec.isDefined) { - (empty2NullPlan.execute(), concurrentOutputWriterSpec) + (empty2NullPlan, concurrentOutputWriterSpec) } else { - (sortPlan.execute(), concurrentOutputWriterSpec) + (sortPlan, concurrentOutputWriterSpec) } } + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planToExecute) + Console.println(f"executing plan $planToExecute") + + val rdd = planToExecute.execute() + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { @@ -278,9 +303,14 @@ object FileFormatWriter extends Logging { planForWrites: SparkPlan, writeFilesSpec: WriteFilesSpec, job: Job): Set[String] = { + Console.println(f"Writing planForWrites $planForWrites") val committer = writeFilesSpec.committer val description = writeFilesSpec.description + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planForWrites) + Console.println(f"executing plan $planForWrites") + writeAndCommit(job, description, committer) { val rdd = planForWrites.executeWrite(writeFilesSpec) val ret = new Array[WriteTaskResult](rdd.partitions.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 50d31645dab4..f62cae21418e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, SortExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.QueryExecutionListener trait V1WriteCommandSuiteBase extends SQLTestUtils { @@ -52,13 +54,12 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { } /** - * Execute a write query and check ordering of the plan. Return the optimized logical write - * query plan. + * Execute a write query and check ordering of the plan. */ protected def executeAndCheckOrdering( hasLogicalSort: Boolean, orderingMatched: Boolean, - hasEmpty2Null: Boolean = false)(query: => Unit): LogicalPlan = { + hasEmpty2Null: Boolean = false)(query: => Unit): Unit = { var optimizedPlan: LogicalPlan = null val listener = new QueryExecutionListener { @@ -97,8 +98,6 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. Plan:\n$optimizedPlan") spark.listenerManager.unregister(listener) - - optimizedPlan } } @@ -162,12 +161,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |CREATE TABLE t(i INT, k STRING) USING PARQUET |PARTITIONED BY (j INT) |""".stripMargin) - // When planned write is disabled, even though the write plan is already sorted, - // the AQE node inserted on top of the write query will remove the original - // sort orders. So the ordering will not match. This issue does not exist when - // planned write is enabled, because AQE will be applied on top of the write - // command instead of on top of the child query plan. - executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) { + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") } } @@ -183,13 +177,91 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |PARTITIONED BY (k STRING) |""".stripMargin) executeAndCheckOrdering( - hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") } } } } + test("v1 write with in-partition sorted plan - non-string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, k STRING) USING PARQUET + |PARTITIONED BY (j INT) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql("INSERT INTO t SELECT i, k, j FROM t0 SORT BY j, k") + } + + print(s"executed plan: ${FileFormatWriter.executedPlan}") + + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get + + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan + } + + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("j", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("k", StringType, _, _), Ascending, NullsFirst, _), + ), false, _, _) => true + case _ => false + }, plan) + } + } + } + + test("v1 write with in-partition sorted plan - string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(i INT, j INT) USING PARQUET + |PARTITIONED BY (k STRING) + |""".stripMargin) + executeAndCheckOrdering( + hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { + sql("INSERT INTO t SELECT * FROM t0 SORT BY k, j") + } + + print(s"executed plan: ${FileFormatWriter.executedPlan}") + + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get + + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan + } + + assert(plan.collectFirst { + case s: SortExec => s + }.map(s => (enabled, s)).exists { + case (false, SortExec(Seq( + SortOrder(AttributeReference("k", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("j", IntegerType, _, _), Ascending, NullsFirst, _), + ), false, _, _)) => true + case (true, SortExec(Seq( + SortOrder(AttributeReference("k", StringType, _, _), Ascending, NullsFirst, _), + ), false, _, _)) => true + case _ => false + }, plan) + } + } + } + test("v1 write with null and empty string column values") { withPlannedWrite { enabled => withTempPath { path => From a68d605969b61eea7f1f6b9bb4af8aff6970d349 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 6 Jan 2023 13:10:27 +0100 Subject: [PATCH 3/5] Test in-partition order with AQE (requires join) --- .../datasources/FileFormatWriter.scala | 6 +- .../datasources/V1WriteCommandSuite.scala | 97 +++++++++++-------- 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index b9837705c8f5..bbf657117e9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -207,7 +207,6 @@ object FileFormatWriter extends Logging { partitionColumns: Seq[Attribute], sortColumns: Seq[Attribute], orderingMatched: Boolean): Set[String] = { - Console.println(f"Writing plan $plan") val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions)) val empty2NullPlan = if (hasEmpty2Null) { plan @@ -232,7 +231,7 @@ object FileFormatWriter extends Logging { // In testing, this is the only way to get hold of the actually executed plan written to file if (Utils.isTesting) executedPlan = Some(planToExecute) - Console.println(f"executing plan $planToExecute") + Console.println(f"executing planToExecute $planToExecute") val rdd = planToExecute.execute() @@ -303,13 +302,12 @@ object FileFormatWriter extends Logging { planForWrites: SparkPlan, writeFilesSpec: WriteFilesSpec, job: Job): Set[String] = { - Console.println(f"Writing planForWrites $planForWrites") val committer = writeFilesSpec.committer val description = writeFilesSpec.description // In testing, this is the only way to get hold of the actually executed plan written to file if (Utils.isTesting) executedPlan = Some(planForWrites) - Console.println(f"executing plan $planForWrites") + Console.println(f"executing planForWrites $planForWrites") writeAndCommit(job, description, committer) { val rdd = planForWrites.executeWrite(writeFilesSpec) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index f62cae21418e..cb08d52b37af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.{QueryExecution, SortExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.QueryExecutionListener @@ -184,54 +185,70 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } } - test("v1 write with in-partition sorted plan - non-string partition column") { - withPlannedWrite { enabled => - withTable("t") { - sql( - """ - |CREATE TABLE t(i INT, k STRING) USING PARQUET - |PARTITIONED BY (j INT) - |""".stripMargin) - executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { - sql("INSERT INTO t SELECT i, k, j FROM t0 SORT BY j, k") - } + test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(b INT, value STRING) USING PARQUET + |PARTITIONED BY (key INT) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql( + """ + |INSERT INTO t + |SELECT b, value, key + |FROM testData JOIN testData2 ON key = a + |SORT BY key, value + |""".stripMargin) + } - print(s"executed plan: ${FileFormatWriter.executedPlan}") + print(s"executed plan: ${FileFormatWriter.executedPlan}") - assert(FileFormatWriter.executedPlan.isDefined) - val executedPlan = FileFormatWriter.executedPlan.get + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get - val plan = if (enabled) { - assert(executedPlan.isInstanceOf[WriteFilesExec]) - executedPlan.asInstanceOf[WriteFilesExec].child - } else { - executedPlan - } + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan.transformDown { + case a: AdaptiveSparkPlanExec => a.executedPlan + } + } - assert(plan.collectFirst { - case s: SortExec => s - }.exists { - case SortExec(Seq( - SortOrder(AttributeReference("j", IntegerType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("k", StringType, _, _), Ascending, NullsFirst, _), - ), false, _, _) => true - case _ => false - }, plan) + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + ), false, _, _) => true + case _ => false + }, plan) + } } } } - test("v1 write with in-partition sorted plan - string partition column") { + test("SPARK-41914: v1 write with AQE and in-partition sorted - string partition column") { withPlannedWrite { enabled => withTable("t") { sql( """ - |CREATE TABLE t(i INT, j INT) USING PARQUET - |PARTITIONED BY (k STRING) + |CREATE TABLE t(key INT, b INT) USING PARQUET + |PARTITIONED BY (value STRING) |""".stripMargin) executeAndCheckOrdering( hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { - sql("INSERT INTO t SELECT * FROM t0 SORT BY k, j") + sql( + """ + |INSERT INTO t + |SELECT key, b, value + |FROM testData JOIN testData2 ON key = a + |SORT BY value, key + |""".stripMargin) } print(s"executed plan: ${FileFormatWriter.executedPlan}") @@ -243,18 +260,22 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write assert(executedPlan.isInstanceOf[WriteFilesExec]) executedPlan.asInstanceOf[WriteFilesExec].child } else { - executedPlan + executedPlan.transformDown { + case a: AdaptiveSparkPlanExec => a.executedPlan + } } assert(plan.collectFirst { case s: SortExec => s }.map(s => (enabled, s)).exists { case (false, SortExec(Seq( - SortOrder(AttributeReference("k", StringType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("j", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), ), false, _, _)) => true + + // SPARK-40885: this bug removes the in-partition sort, which manifests here case (true, SortExec(Seq( - SortOrder(AttributeReference("k", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), ), false, _, _)) => true case _ => false }, plan) From 8414715e5d14a69527c745585ffec2dc0ffbe980 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 6 Jan 2023 13:14:18 +0100 Subject: [PATCH 4/5] Remove printouts, add comments, fix imports --- .../sql/execution/datasources/FileFormatWriter.scala | 2 -- .../execution/datasources/V1WriteCommandSuite.scala | 12 ++++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index bbf657117e9d..6285095c647e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -231,7 +231,6 @@ object FileFormatWriter extends Logging { // In testing, this is the only way to get hold of the actually executed plan written to file if (Utils.isTesting) executedPlan = Some(planToExecute) - Console.println(f"executing planToExecute $planToExecute") val rdd = planToExecute.execute() @@ -307,7 +306,6 @@ object FileFormatWriter extends Logging { // In testing, this is the only way to get hold of the actually executed plan written to file if (Utils.isTesting) executedPlan = Some(planForWrites) - Console.println(f"executing planForWrites $planForWrites") writeAndCommit(job, description, committer) { val rdd = planForWrites.executeWrite(writeFilesSpec) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index cb08d52b37af..c469265ab7d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.{QueryExecution, SortExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.QueryExecutionListener @@ -204,8 +204,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |""".stripMargin) } - print(s"executed plan: ${FileFormatWriter.executedPlan}") - + // inspect the actually executed plan (that is different to executeAndCheckOrdering) assert(FileFormatWriter.executedPlan.isDefined) val executedPlan = FileFormatWriter.executedPlan.get @@ -218,6 +217,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } } + // assert the outer most sort in the executed plan assert(plan.collectFirst { case s: SortExec => s }.exists { @@ -251,8 +251,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |""".stripMargin) } - print(s"executed plan: ${FileFormatWriter.executedPlan}") - + // inspect the actually executed plan (that is different to executeAndCheckOrdering) assert(FileFormatWriter.executedPlan.isDefined) val executedPlan = FileFormatWriter.executedPlan.get @@ -265,6 +264,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } } + // assert the outer most sort in the executed plan assert(plan.collectFirst { case s: SortExec => s }.map(s => (enabled, s)).exists { From 0ae3ce59cfe8437d8b809cf60e487a86151dc2de Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 6 Jan 2023 13:44:00 +0100 Subject: [PATCH 5/5] Fix Scala lint errors --- .../sql/execution/datasources/V1WriteCommandSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index c469265ab7d3..80d0369044cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -223,7 +223,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write }.exists { case SortExec(Seq( SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) ), false, _, _) => true case _ => false }, plan) @@ -270,12 +270,12 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write }.map(s => (enabled, s)).exists { case (false, SortExec(Seq( SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) ), false, _, _)) => true // SPARK-40885: this bug removes the in-partition sort, which manifests here case (true, SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) ), false, _, _)) => true case _ => false }, plan)