diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 62a75e753455..395e5468b640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -225,6 +225,8 @@ case class AdaptiveSparkPlanExec( .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) } + def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 8ce87d6fbe14..6285095c647e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -62,6 +63,11 @@ object FileFormatWriter extends Logging { */ private[sql] var outputOrderingMatched: Boolean = false + /** + * A variable used in tests to check the final executed plan. + */ + private[sql] var executedPlan: Option[SparkPlan] = None + // scalastyle:off argcount /** * Basic work flow of this command is: @@ -138,9 +144,21 @@ object FileFormatWriter extends Logging { val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) + + // SPARK-40588: when planned writing is disabled and AQE is enabled, + // plan contains an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + // it is fine to use plan further down as the final plan is cached in that plan + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + // the sort order doesn't matter // Use the output ordering from the original plan before adding the empty2null projection. - val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child) + val actualOrdering = writeFilesOpt.map(_.child) + .getOrElse(materializeAdaptiveSparkPlan(plan)) + .outputOrdering.map(_.child) val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) @@ -198,19 +216,24 @@ object FileFormatWriter extends Logging { } writeAndCommit(job, description, committer) { - val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - (empty2NullPlan.execute(), None) + val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { + (empty2NullPlan, None) } else { val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) val concurrentOutputWriterSpec = createConcurrentOutputWriterSpec( sparkSession, sortPlan, sortColumns) if (concurrentOutputWriterSpec.isDefined) { - (empty2NullPlan.execute(), concurrentOutputWriterSpec) + (empty2NullPlan, concurrentOutputWriterSpec) } else { - (sortPlan.execute(), concurrentOutputWriterSpec) + (sortPlan, concurrentOutputWriterSpec) } } + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planToExecute) + + val rdd = planToExecute.execute() + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { @@ -281,6 +304,9 @@ object FileFormatWriter extends Logging { val committer = writeFilesSpec.committer val description = writeFilesSpec.description + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planForWrites) + writeAndCommit(job, description, committer) { val rdd = planForWrites.executeWrite(writeFilesSpec) val ret = new Array[WriteTaskResult](rdd.partitions.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index e9c5c77e6d98..80d0369044cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, SortExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.QueryExecutionListener trait V1WriteCommandSuiteBase extends SQLTestUtils { @@ -52,8 +55,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { } /** - * Execute a write query and check ordering of the plan. Return the optimized logical write - * query plan. + * Execute a write query and check ordering of the plan. */ protected def executeAndCheckOrdering( hasLogicalSort: Boolean, @@ -160,12 +162,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |CREATE TABLE t(i INT, k STRING) USING PARQUET |PARTITIONED BY (j INT) |""".stripMargin) - // When planned write is disabled, even though the write plan is already sorted, - // the AQE node inserted on top of the write query will remove the original - // sort orders. So the ordering will not match. This issue does not exist when - // planned write is enabled, because AQE will be applied on top of the write - // command instead of on top of the child query plan. - executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) { + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") } } @@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |PARTITIONED BY (k STRING) |""".stripMargin) executeAndCheckOrdering( - hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") } } } } + test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(b INT, value STRING) USING PARQUET + |PARTITIONED BY (key INT) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql( + """ + |INSERT INTO t + |SELECT b, value, key + |FROM testData JOIN testData2 ON key = a + |SORT BY key, value + |""".stripMargin) + } + + // inspect the actually executed plan (that is different to executeAndCheckOrdering) + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get + + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan.transformDown { + case a: AdaptiveSparkPlanExec => a.executedPlan + } + } + + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true + case _ => false + }, plan) + } + } + } + } + + test("SPARK-41914: v1 write with AQE and in-partition sorted - string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(key INT, b INT) USING PARQUET + |PARTITIONED BY (value STRING) + |""".stripMargin) + executeAndCheckOrdering( + hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { + sql( + """ + |INSERT INTO t + |SELECT key, b, value + |FROM testData JOIN testData2 ON key = a + |SORT BY value, key + |""".stripMargin) + } + + // inspect the actually executed plan (that is different to executeAndCheckOrdering) + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get + + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan.transformDown { + case a: AdaptiveSparkPlanExec => a.executedPlan + } + } + + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s + }.map(s => (enabled, s)).exists { + case (false, SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) + ), false, _, _)) => true + + // SPARK-40885: this bug removes the in-partition sort, which manifests here + case (true, SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + ), false, _, _)) => true + case _ => false + }, plan) + } + } + } + test("v1 write with null and empty string column values") { withPlannedWrite { enabled => withTempPath { path =>