diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 1ac7aa00d98c6..fc941efb9eaf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -319,3 +319,46 @@ case class RDDScanExec( override def getStream: Option[SparkDataStream] = stream } + +/** + * A physical plan node for `OneRowRelation` for scans with no 'FROM' clause. + * + * We do not extend `RDDScanExec` in order to avoid complexity due to `TreeNode.makeCopy` and + * `TreeNode`'s general use of reflection. + */ +case class OneRowRelationExec() extends LeafExecNode + with InputRDDCodegen { + + override val nodeName: String = s"Scan OneRowRelation" + + override val output: Seq[Attribute] = Nil + + private val rdd: RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + session + .sparkContext + .parallelize(Seq(""), 1) + .mapPartitionsInternal { _ => + val proj = UnsafeProjection.create(Seq.empty[Expression]) + Iterator(proj.apply(InternalRow.empty)).map { r => + numOutputRows += 1 + r + } + } + } + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = rdd + + override def simpleString(maxFields: Int): String = s"$nodeName[]" + + override def inputRDD: RDD[InternalRow] = rdd + + override protected val createUnsafeProjection: Boolean = false + + override protected def doCanonicalize(): SparkPlan = { + super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 34d24e3b1e7f1..da1ccb2532f0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -690,8 +690,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - protected lazy val singleRowRdd = session.sparkContext.parallelize(Seq(InternalRow()), 1) - object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => @@ -1054,7 +1052,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, g.requiredChildOutput, outer, g.qualifiedGeneratorOutput, planLater(child)) :: Nil case _: logical.OneRowRelation => - execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil + execution.OneRowRelationExec() :: Nil case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 1ee467ef3554b..21b5177fe2208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -56,6 +56,7 @@ trait CodegenSupport extends SparkPlan { case _: SortMergeJoinExec => "smj" case _: BroadcastNestedLoopJoinExec => "bnlj" case _: RDDScanExec => "rdd" + case _: OneRowRelationExec => "orr" case _: DataSourceScanExec => "scan" case _: InMemoryTableScanExec => "memoryScan" case _: WholeStageCodegenExec => "wholestagecodegen" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 138a29c6ae804..26aa4b6b5210f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedCo import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression, Sort} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.execution.{CommandResultExec, UnionExec} +import org.apache.spark.sql.execution.{CommandResultExec, OneRowRelationExec, UnionExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -4962,6 +4962,18 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark parameters = Map("plan" -> "'Aggregate [groupingsets(Vector(0), posexplode(array(col)))]") ) } + + Seq(true, false).foreach { codegenEnabled => + test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) { + val df = spark.sql("select 'test' stringCol") + checkAnswer(df, Row("test")) + val plan = df.queryExecution.executedPlan + val oneRowRelationExists = plan.find(_.isInstanceOf[OneRowRelationExec]).isDefined + assert(oneRowRelationExists) + } + } + } } case class Foo(bar: Option[String])